This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit cdd30aef489f6e7d414b09d84ff970f120d2c1ea Author: Eamon Ford <[email protected]> AuthorDate: Mon Jul 27 19:31:17 2020 -0700 solr history bug fixes --- collection_manager/collection_manager/main.py | 2 +- .../history_manager/SolrIngestionHistory.py | 19 +++++++--------- granule_ingester/conda-requirements.txt | 2 +- granule_ingester/docker/entrypoint.sh | 4 +++- .../granule_ingester/consumer/Consumer.py | 1 + granule_ingester/granule_ingester/main.py | 25 ++++++++++++++++++---- .../granule_ingester/pipeline/Pipeline.py | 9 +++++++- .../granule_ingester/writers/CassandraStore.py | 14 ++++++++++-- .../granule_ingester/writers/SolrStore.py | 2 +- 9 files changed, 56 insertions(+), 22 deletions(-) diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py index 7e72de5..43b687e 100644 --- a/collection_manager/collection_manager/main.py +++ b/collection_manager/collection_manager/main.py @@ -84,7 +84,7 @@ async def main(): return except Exception as e: - logger.error(e) + logger.exception(e) return diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py index 1ae7156..4e6d3e5 100644 --- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py @@ -35,8 +35,8 @@ class SolrIngestionHistory(IngestionHistory): try: self._solr_url = solr_url self._create_collection_if_needed() - self._solr_granules = pysolr.Solr('/'.join([solr_url.strip('/'), self._granule_collection_name])) - self._solr_datasets = pysolr.Solr('/'.join([solr_url.strip('/'), self._dataset_collection_name])) + self._solr_granules = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._granule_collection_name}") + self._solr_datasets = pysolr.Solr(f"{solr_url.strip('/')}/solr/{self._dataset_collection_name}") self._dataset_id = dataset_id self._signature_fun = md5sum_from_filepath if signature_fun is None else signature_fun self._latest_ingested_file_update = self._get_latest_file_update() @@ -63,7 +63,7 @@ class SolrIngestionHistory(IngestionHistory): self._solr_datasets.add([{ 'id': self._dataset_id, 'dataset_s': self._dataset_id, - 'latest_update_l': self._latest_ingested_file_update}]) + 'latest_update_l': int(self._latest_ingested_file_update)}]) self._solr_datasets.commit() def _get_latest_file_update(self): @@ -87,8 +87,7 @@ class SolrIngestionHistory(IngestionHistory): self._req_session = requests.session() payload = {'action': 'CLUSTERSTATUS'} - result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']), - params=payload) + result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload) response = result.json() node_number = len(response['cluster']['live_nodes']) @@ -100,12 +99,11 @@ class SolrIngestionHistory(IngestionHistory): 'name': self._granule_collection_name, 'numShards': node_number } - result = self._req_session.get('/'.join([self._solr_url.strip("/"), 'admin', 'collections']), - params=payload) + result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload) response = result.json() logger.info(f"solr collection created {response}") # Update schema - schema_url = '/'.join([self._solr_url.strip('/'), self._granule_collection_name, 'schema']) + schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema" # granule_s # dataset_s so that all the granule of a dataset are less likely to be on the same shard # self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField") self._add_field(schema_url, "dataset_s", "StrField") @@ -121,13 +119,12 @@ class SolrIngestionHistory(IngestionHistory): 'name': self._dataset_collection_name, 'numShards': node_number } - result = self._req_session.get('/'.join([self._solr_url.strip('/'), 'admin', 'collections']), - params=payload) + result = self._req_session.get(f"{self._solr_url.strip('/')}/solr/admin/collections", params=payload) response = result.json() logger.info(f"solr collection created {response}") # Update schema # http://localhost:8983/solr/nexusdatasets/schema?_=1588555874864&wt=json - schema_url = '/'.join([self._solr_url.strip('/'), self._dataset_collection_name, 'schema']) + schema_url = f"{self._solr_url.strip('/')}/{self._granule_collection_name}/schema" # self.add_unique_key_field(schema_url, "uniqueKey_s", "StrField") self._add_field(schema_url, "dataset_s", "StrField") self._add_field(schema_url, "latest_update_l", "TrieLongField") diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index b2af149..fafd6f3 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -6,5 +6,5 @@ xarray pyyaml==5.3.1 requests==2.23.0 aiohttp==3.6.2 -aio-pika +aio-pika==6.6.1 tenacity diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index 3a1cb9b..2b6174a 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -7,5 +7,7 @@ python /sdap/granule_ingester/main.py \ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \ $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \ $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \ - $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) + $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \ + $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \ + $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) \ $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index d40b54c..d5f1d97 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -104,4 +104,5 @@ class Consumer(HealthCheck): raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.") except Exception as e: await queue_iter.close() + await channel.close() raise e diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index f602da8..8b8d40f 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -26,8 +26,8 @@ from granule_ingester.healthcheck import HealthCheck from granule_ingester.writers import CassandraStore, SolrStore -def cassandra_factory(contact_points, port): - store = CassandraStore(contact_points, port) +def cassandra_factory(contact_points, port, username, password): + store = CassandraStore(contact_points=contact_points, port=port, username=username, password=password) store.connect() return store @@ -73,6 +73,14 @@ async def main(loop): default=9042, metavar="PORT", help='Cassandra port. (Default: 9042)') + parser.add_argument('--cassandra_username', + metavar="USERNAME", + default=None, + help='Cassandra username. Optional.') + parser.add_argument('--cassandra_password', + metavar="PASSWORD", + default=None, + help='Cassandra password. Optional.') parser.add_argument('--solr_host_and_port', default='http://localhost:8983', metavar='HOST:PORT', @@ -99,6 +107,8 @@ async def main(loop): cassandra_contact_points = args.cassandra_contact_points cassandra_port = args.cassandra_port + cassandra_username = args.cassandra_username + cassandra_password = args.cassandra_password solr_host_and_port = args.solr_host_and_port zk_host_and_port = args.zk_host_and_port @@ -106,11 +116,18 @@ async def main(loop): rabbitmq_username=args.rabbitmq_username, rabbitmq_password=args.rabbitmq_password, rabbitmq_queue=args.rabbitmq_queue, - data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port), + data_store_factory=partial(cassandra_factory, + cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), metadata_store_factory=partial(solr_factory, solr_host_and_port, zk_host_and_port)) try: solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port) - await run_health_checks([CassandraStore(cassandra_contact_points, cassandra_port), + await run_health_checks([CassandraStore(cassandra_contact_points, + cassandra_port, + cassandra_username, + cassandra_password), solr_store, consumer]) async with consumer: diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index a667d5e..2181da2 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +import os import pickle import time from multiprocessing import Manager @@ -99,6 +100,8 @@ class Pipeline: self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory + self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16)) + # Create a SyncManager so that we can to communicate exceptions from the # worker processes back to the main process. self._manager = Manager() @@ -161,6 +164,8 @@ class Pipeline: return processor_module async def run(self): + + logger.info(f"Running pipeline with {self._max_concurrency} threads per process") async with self._granule_loader as (dataset, granule_name): start = time.perf_counter() @@ -170,7 +175,9 @@ class Pipeline: dataset, self._data_store_factory, self._metadata_store_factory, - shared_memory)) as pool: + shared_memory), + maxtasksperchild=self._max_concurrency, + childconcurrency=self._max_concurrency) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index 6b2cf32..cb5232b 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -18,6 +18,7 @@ import asyncio import logging import uuid +from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, Session, NoHostAvailable from cassandra.cqlengine import columns from cassandra.cqlengine.models import Model @@ -39,8 +40,10 @@ class TileModel(Model): class CassandraStore(DataStore): - def __init__(self, contact_points=None, port=9042): + def __init__(self, contact_points=None, port=9042, username=None, password=None): self._contact_points = contact_points + self._username = username + self._password = password self._port = port self._session = None @@ -53,11 +56,18 @@ class CassandraStore(DataStore): raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!") def _get_session(self) -> Session: + + if self._username and self._password: + auth_provider = PlainTextAuthProvider(username=self._username, password=self._password) + else: + auth_provider = None + cluster = Cluster(contact_points=self._contact_points, port=self._port, # load_balancing_policy= reconnection_policy=ConstantReconnectionPolicy(delay=5.0), - default_retry_policy=RetryPolicy()) + default_retry_policy=RetryPolicy(), + auth_provider=auth_provider) session = cluster.connect() session.set_keyspace('nexustiles') return session diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 926a75c..276a988 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -59,7 +59,7 @@ class SolrStore(MetadataStore): def _get_connection(self) -> pysolr.Solr: if self._zk_url: - zk = pysolr.ZooKeeper(f"{self._zk_url}/solr") + zk = pysolr.ZooKeeper(f"{self._zk_url}") collections = {} for c in zk.zk.get_children("collections"): collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
