This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch cassandra-auth in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit ffad48fc868fc78520f8dcc369f23e64cac5b7c5 Author: Eamon Ford <[email protected]> AuthorDate: Tue Aug 4 12:47:31 2020 -0700 add cassandra auth and zookeeper support --- granule_ingester/docker/entrypoint.sh | 5 +- .../granule_ingester/exceptions/Exceptions.py | 41 +++++++++++ .../granule_ingester/exceptions/__init__.py | 11 +++ granule_ingester/granule_ingester/main.py | 61 ++++++++++++----- .../granule_ingester/writers/CassandraStore.py | 39 ++++++++--- .../granule_ingester/writers/SolrStore.py | 79 ++++++++++++++-------- granule_ingester/requirements.txt | 2 + 7 files changed, 183 insertions(+), 55 deletions(-) diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index e6f7262..2b6174a 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -7,4 +7,7 @@ python /sdap/granule_ingester/main.py \ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \ $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \ $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \ - $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) + $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \ + $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \ + $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) \ + $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py b/granule_ingester/granule_ingester/exceptions/Exceptions.py new file mode 100644 index 0000000..c648b99 --- /dev/null +++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py @@ -0,0 +1,41 @@ +class PipelineBuildingError(Exception): + pass + + +class PipelineRunningError(Exception): + pass + + +class TileProcessingError(Exception): + pass + + +class LostConnectionError(Exception): + pass + + +class RabbitMQLostConnectionError(LostConnectionError): + pass + + +class CassandraLostConnectionError(LostConnectionError): + pass + +class SolrLostConnectionError(LostConnectionError): + pass + + +class FailedHealthCheckError(Exception): + pass + + +class CassandraFailedHealthCheckError(FailedHealthCheckError): + pass + + +class SolrFailedHealthCheckError(FailedHealthCheckError): + pass + + +class RabbitMQFailedHealthCheckError(FailedHealthCheckError): + pass diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py b/granule_ingester/granule_ingester/exceptions/__init__.py new file mode 100644 index 0000000..ea0969f --- /dev/null +++ b/granule_ingester/granule_ingester/exceptions/__init__.py @@ -0,0 +1,11 @@ +from .Exceptions import CassandraFailedHealthCheckError +from .Exceptions import CassandraLostConnectionError +from .Exceptions import FailedHealthCheckError +from .Exceptions import LostConnectionError +from .Exceptions import PipelineBuildingError +from .Exceptions import PipelineRunningError +from .Exceptions import RabbitMQFailedHealthCheckError +from .Exceptions import RabbitMQLostConnectionError +from .Exceptions import SolrFailedHealthCheckError +from .Exceptions import SolrLostConnectionError +from .Exceptions import TileProcessingError diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 5a8fc2d..8b8d40f 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -16,23 +16,24 @@ import argparse import asyncio import logging +import sys from functools import partial from typing import List from granule_ingester.consumer import Consumer +from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError from granule_ingester.healthcheck import HealthCheck -from granule_ingester.writers import CassandraStore -from granule_ingester.writers import SolrStore +from granule_ingester.writers import CassandraStore, SolrStore -def cassandra_factory(contact_points, port): - store = CassandraStore(contact_points, port) +def cassandra_factory(contact_points, port, username, password): + store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password) store.connect() return store -def solr_factory(solr_host_and_port): - store = SolrStore(solr_host_and_port) +def solr_factory(solr_host_and_port, zk_host_and_port): + store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port) store.connect() return store @@ -44,7 +45,7 @@ async def run_health_checks(dependencies: List[HealthCheck]): return True -async def main(): +async def main(loop): parser = argparse.ArgumentParser(description='Listen to RabbitMQ for granule ingestion instructions, and process ' 'and ingest a granule for each message that comes through.') parser.add_argument('--rabbitmq_host', @@ -72,10 +73,20 @@ async def main(): default=9042, metavar="PORT", help='Cassandra port. (Default: 9042)') + parser.add_argument('--cassandra_username', + metavar="USERNAME", + default=None, + help='Cassandra username. Optional.') + parser.add_argument('--cassandra_password', + metavar="PASSWORD", + default=None, + help='Cassandra password. Optional.') parser.add_argument('--solr_host_and_port', default='http://localhost:8983', metavar='HOST:PORT', help='Solr host and port. (Default: http://localhost:8983)') + parser.add_argument('--zk_host_and_port', + metavar="HOST:PORT") parser.add_argument('-v', '--verbose', action='store_true', @@ -96,24 +107,42 @@ async def main(): cassandra_contact_points = args.cassandra_contact_points cassandra_port = args.cassandra_port + cassandra_username = args.cassandra_username + cassandra_password = args.cassandra_password solr_host_and_port = args.solr_host_and_port + zk_host_and_port = args.zk_host_and_port consumer = Consumer(rabbitmq_host=args.rabbitmq_host, rabbitmq_username=args.rabbitmq_username, rabbitmq_password=args.rabbitmq_password, rabbitmq_queue=args.rabbitmq_queue, - data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port), - metadata_store_factory=partial(solr_factory, solr_host_and_port)) - if await run_health_checks( - [CassandraStore(cassandra_contact_points, cassandra_port), - SolrStore(solr_host_and_port), - consumer]): + data_store_factory=partial(cassandra_factory, + cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), + metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port)) + try: + solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port) + await run_health_checks([CassandraStore(cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), + solr_store, + consumer]) async with consumer: logger.info("All external dependencies have passed the health checks. Now listening to message queue.") await consumer.start_consuming() - else: - logger.error("Quitting because not all dependencies passed the health checks.") + except FailedHealthCheckError as e: + logger.error(f"Quitting because not all dependencies passed the health checks: {e}") + except LostConnectionError as e: + logger.error(f"{e} Any messages that were being processed have been re-queued. Quitting.") + except Exception as e: + logger.exception(f"Shutting down because of an unrecoverable error:\n{e}") + finally: + sys.exit(1) if __name__ == '__main__': - asyncio.run(main()) + loop = asyncio.get_event_loop() + loop.run_until_complete(main(loop)) diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index 7a9f146..cb5232b 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -18,11 +18,14 @@ import asyncio import logging import uuid -from cassandra.cluster import Cluster, Session +from cassandra.auth import PlainTextAuthProvider +from cassandra.cluster import Cluster, Session, NoHostAvailable from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model +from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy from nexusproto.DataTile_pb2 import NexusTile, TileData +from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError from granule_ingester.writers.DataStore import DataStore logging.getLogger('cassandra').setLevel(logging.INFO) @@ -37,8 +40,10 @@ class TileModel(Model): class CassandraStore(DataStore): - def __init__(self, contact_points=None, port=9042): + def __init__(self, contact_points=None, port=9042, username=None, password=None): self._contact_points = contact_points + self._username = username + self._password = password self._port = port self._session = None @@ -47,12 +52,22 @@ class CassandraStore(DataStore): session = self._get_session() session.shutdown() return True - except: - logger.error("Cannot connect to Cassandra!") - return False + except Exception: + raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!") def _get_session(self) -> Session: - cluster = Cluster(contact_points=self._contact_points, port=self._port) + + if self._username and self._password: + auth_provider = PlainTextAuthProvider(username=self._username, password=self._password) + else: + auth_provider = None + + cluster = Cluster(contact_points=self._contact_points, + port=self._port, + # load_balancing_policy= + reconnection_policy=ConstantReconnectionPolicy(delay=5.0), + default_retry_policy=RetryPolicy(), + auth_provider=auth_provider) session = cluster.connect() session.set_keyspace('nexustiles') return session @@ -65,10 +80,14 @@ class CassandraStore(DataStore): self._session.shutdown() async def save_data(self, tile: NexusTile) -> None: - tile_id = uuid.UUID(tile.summary.tile_id) - serialized_tile_data = TileData.SerializeToString(tile.tile) - prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") - await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) + try: + tile_id = uuid.UUID(tile.summary.tile_id) + serialized_tile_data = TileData.SerializeToString(tile.tile) + prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") + await self._execute_query_async(self._session, prepared_query, + [tile_id, bytearray(serialized_tile_data)]) + except NoHostAvailable: + raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.") @staticmethod async def _execute_query_async(session: Session, query, parameters=None): diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 9d6a7f0..276a988 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -13,64 +13,87 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from asyncio import AbstractEventLoop - +import asyncio +import functools +import json import logging +from asyncio import AbstractEventLoop from datetime import datetime from pathlib import Path from typing import Dict -import aiohttp +import pysolr +from kazoo.handlers.threading import KazooTimeoutError +from kazoo.exceptions import NoNodeError from nexusproto.DataTile_pb2 import * -from tenacity import * +from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError from granule_ingester.writers.MetadataStore import MetadataStore logger = logging.getLogger(__name__) +def run_in_executor(f): + @functools.wraps(f) + def inner(*args, **kwargs): + loop = asyncio.get_running_loop() + return loop.run_in_executor(None, lambda: f(*args, **kwargs)) + + return inner + + class SolrStore(MetadataStore): - def __init__(self, host_and_port='http://localhost:8983'): + def __init__(self, solr_url=None, zk_url=None): super().__init__() self.TABLE_NAME = "sea_surface_temp" self.iso: str = '%Y-%m-%dT%H:%M:%SZ' - self._host_and_port = host_and_port + self._solr_url = solr_url + self._zk_url = zk_url self.geo_precision: int = 3 - self.collection: str = "nexustiles" + self._collection: str = "nexustiles" self.log: logging.Logger = logging.getLogger(__name__) self.log.setLevel(logging.DEBUG) - self._session = None + self._solr = None + + def _get_connection(self) -> pysolr.Solr: + if self._zk_url: + zk = pysolr.ZooKeeper(f"{self._zk_url}") + collections = {} + for c in zk.zk.get_children("collections"): + collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii"))) + zk.collections = collections + return pysolr.SolrCloud(zk, self._collection, always_commit=True) + elif self._solr_url: + return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True) + else: + raise RuntimeError("You must provide either solr_host or zookeeper_host.") def connect(self, loop: AbstractEventLoop = None): - self._session = aiohttp.ClientSession(loop=loop) + self._solr = self._get_connection() async def health_check(self): try: - async with aiohttp.ClientSession() as session: - response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection)) - if response.status == 200: - return True - else: - logger.error("Solr health check returned status {}.".format(response.status)) - except aiohttp.ClientConnectionError as e: - logger.error("Cannot connect to Solr!") - - return False + connection = self._get_connection() + connection.ping() + except pysolr.SolrError: + raise SolrFailedHealthCheckError("Cannot connect to Solr!") + except NoNodeError: + raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!") + except KazooTimeoutError: + raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!") async def save_metadata(self, nexus_tile: NexusTile) -> None: solr_doc = self._build_solr_doc(nexus_tile) + await self._save_document(solr_doc) - await self._save_document(self.collection, solr_doc) - - @retry(stop=stop_after_attempt(5)) - async def _save_document(self, collection: str, doc: dict): - url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection) - response = await self._session.post(url, json=doc) - if response.status < 200 or response.status >= 400: - raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status)) + @run_in_executor + def _save_document(self, doc: dict): + try: + self._solr.add([doc]) + except pysolr.SolrError: + raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.") def _build_solr_doc(self, tile: NexusTile) -> Dict: summary: TileSummary = tile.summary diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt index 4d9d4cb..16f83bf 100644 --- a/granule_ingester/requirements.txt +++ b/granule_ingester/requirements.txt @@ -1,3 +1,5 @@ cassandra-driver==3.23.0 +pysolr==3.9.0 +kazoo==2.8.0 aiomultiprocess aioboto3
