This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 60c383ba87b21da055c9990aeb5b1afb85f80ab6 Author: Eamon Ford <[email protected]> AuthorDate: Mon Jul 27 19:31:17 2020 -0700 solr history bug fixes --- granule_ingester/conda-requirements.txt | 2 +- granule_ingester/docker/entrypoint.sh | 4 +++- .../granule_ingester/consumer/Consumer.py | 1 + granule_ingester/granule_ingester/main.py | 25 ++++++++++++++++++---- .../granule_ingester/pipeline/Pipeline.py | 9 +++++++- .../granule_ingester/writers/CassandraStore.py | 14 ++++++++++-- .../granule_ingester/writers/SolrStore.py | 2 +- 7 files changed, 47 insertions(+), 10 deletions(-) diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index b2af149..fafd6f3 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -6,5 +6,5 @@ xarray pyyaml==5.3.1 requests==2.23.0 aiohttp==3.6.2 -aio-pika +aio-pika==6.6.1 tenacity diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index 3a1cb9b..2b6174a 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -7,5 +7,7 @@ python /sdap/granule_ingester/main.py \ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \ $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \ $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \ - $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) + $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \ + $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \ + $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) \ $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index d40b54c..d5f1d97 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -104,4 +104,5 @@ class Consumer(HealthCheck): raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.") except Exception as e: await queue_iter.close() + await channel.close() raise e diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index f602da8..8b8d40f 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -26,8 +26,8 @@ from granule_ingester.healthcheck import HealthCheck from granule_ingester.writers import CassandraStore, SolrStore -def cassandra_factory(contact_points, port): - store = CassandraStore(contact_points, port) +def cassandra_factory(contact_points, port, username, password): + store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password) store.connect() return store @@ -73,6 +73,14 @@ async def main(loop): default=9042, metavar="PORT", help='Cassandra port. (Default: 9042)') + parser.add_argument('--cassandra_username', + metavar="USERNAME", + default=None, + help='Cassandra username. Optional.') + parser.add_argument('--cassandra_password', + metavar="PASSWORD", + default=None, + help='Cassandra password. Optional.') parser.add_argument('--solr_host_and_port', default='http://localhost:8983', metavar='HOST:PORT', @@ -99,6 +107,8 @@ async def main(loop): cassandra_contact_points = args.cassandra_contact_points cassandra_port = args.cassandra_port + cassandra_username = args.cassandra_username + cassandra_password = args.cassandra_password solr_host_and_port = args.solr_host_and_port zk_host_and_port = args.zk_host_and_port @@ -106,11 +116,18 @@ async def main(loop): rabbitmq_username=args.rabbitmq_username, rabbitmq_password=args.rabbitmq_password, rabbitmq_queue=args.rabbitmq_queue, - data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port), + data_store_factory=partial(cassandra_factory, + cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port)) try: solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port) - await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port), + await run_health_checks([CassandraStore(cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), solr_store, consumer]) async with consumer: diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index a667d5e..2181da2 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +import os import pickle import time from multiprocessing import Manager @@ -99,6 +100,8 @@ class Pipeline: self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory + self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16)) + # Create a SyncManager so that we can to communicate exceptions from the # worker processes back to the main process. self._manager = Manager() @@ -161,6 +164,8 @@ class Pipeline: return processor_module async def run(self): + + logger.info(f"Running pipeline with {self._max_concurrency} threads per process") async with self._granule_loader as (dataset, granule_name): start = time.perf_counter() @@ -170,7 +175,9 @@ class Pipeline: dataset, self._data_store_factory, self._metadata_store_factory, - shared_memory)) as pool: + shared_memory), + maxtasksperchild=self._max_concurrency, + childconcurrency=self._max_concurrency) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index 6b2cf32..cb5232b 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -18,6 +18,7 @@ import asyncio import logging import uuid +from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, Session, NoHostAvailable from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model @@ -39,8 +40,10 @@ class TileModel(Model): class CassandraStore(DataStore): - def __init__(self, contact_points=None, port=9042): + def __init__(self, contact_points=None, port=9042, username=None, password=None): self._contact_points = contact_points + self._username = username + self._password = password self._port = port self._session = None @@ -53,11 +56,18 @@ class CassandraStore(DataStore): raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!") def _get_session(self) -> Session: + + if self._username and self._password: + auth_provider = PlainTextAuthProvider(username=self._username, password=self._password) + else: + auth_provider = None + cluster = Cluster(contact_points=self._contact_points, port=self._port, # load_balancing_policy= reconnection_policy=ConstantReconnectionPolicy(delay=5.0), - default_retry_policy=RetryPolicy()) + default_retry_policy=RetryPolicy(), + auth_provider=auth_provider) session = cluster.connect() session.set_keyspace('nexustiles') return session diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 926a75c..276a988 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -59,7 +59,7 @@ class SolrStore(MetadataStore): def _get_connection(self) -> pysolr.Solr: if self._zk_url: - zk = pysolr.ZooKeeper(f"{self._zk_url}/solr") + zk = pysolr.ZooKeeper(f"{self._zk_url}") collections = {} for c in zk.zk.get_children("collections"): collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
