This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 67703003e505095edc6eab8843e899d20df62347 Author: Eamon Ford <[email protected]> AuthorDate: Mon Jul 27 19:31:17 2020 -0700 solr history bug fixes --- granule_ingester/conda-requirements.txt | 2 +- granule_ingester/granule_ingester/consumer/Consumer.py | 1 + granule_ingester/granule_ingester/main.py | 2 ++ granule_ingester/granule_ingester/pipeline/Pipeline.py | 9 ++++++++- granule_ingester/granule_ingester/writers/SolrStore.py | 2 +- 5 files changed, 13 insertions(+), 3 deletions(-) diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index b2af149..fafd6f3 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -6,5 +6,5 @@ xarray pyyaml==5.3.1 requests==2.23.0 aiohttp==3.6.2 -aio-pika +aio-pika==6.6.1 tenacity diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py index d40b54c..d5f1d97 100644 --- a/granule_ingester/granule_ingester/consumer/Consumer.py +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -104,4 +104,5 @@ class Consumer(HealthCheck): raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.") except Exception as e: await queue_iter.close() + await channel.close() raise e diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 87a6d5a..e50a395 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -109,6 +109,8 @@ async def main(loop): cassandra_password = args.cassandra_password cassandra_contact_points = args.cassandra_contact_points cassandra_port = args.cassandra_port + cassandra_username = args.cassandra_username + cassandra_password = args.cassandra_password solr_host_and_port = args.solr_host_and_port zk_host_and_port = args.zk_host_and_port diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index a667d5e..2181da2 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +import os import pickle import time from multiprocessing import Manager @@ -99,6 +100,8 @@ class Pipeline: self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory + self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16)) + # Create a SyncManager so that we can to communicate exceptions from the # worker processes back to the main process. self._manager = Manager() @@ -161,6 +164,8 @@ class Pipeline: return processor_module async def run(self): + + logger.info(f"Running pipeline with {self._max_concurrency} threads per process") async with self._granule_loader as (dataset, granule_name): start = time.perf_counter() @@ -170,7 +175,9 @@ class Pipeline: dataset, self._data_store_factory, self._metadata_store_factory, - shared_memory)) as pool: + shared_memory), + maxtasksperchild=self._max_concurrency, + childconcurrency=self._max_concurrency) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 926a75c..276a988 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -59,7 +59,7 @@ class SolrStore(MetadataStore): def _get_connection(self) -> pysolr.Solr: if self._zk_url: - zk = pysolr.ZooKeeper(f"{self._zk_url}/solr") + zk = pysolr.ZooKeeper(f"{self._zk_url}") collections = {} for c in zk.zk.get_children("collections"): collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
