This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 60c383ba87b21da055c9990aeb5b1afb85f80ab6
Author: Eamon Ford <[email protected]>
AuthorDate: Mon Jul 27 19:31:17 2020 -0700

    solr history bug fixes
---
 granule_ingester/conda-requirements.txt            |  2 +-
 granule_ingester/docker/entrypoint.sh              |  4 +++-
 .../granule_ingester/consumer/Consumer.py          |  1 +
 granule_ingester/granule_ingester/main.py          | 25 ++++++++++++++++++----
 .../granule_ingester/pipeline/Pipeline.py          |  9 +++++++-
 .../granule_ingester/writers/CassandraStore.py     | 14 ++++++++++--
 .../granule_ingester/writers/SolrStore.py          |  2 +-
 7 files changed, 47 insertions(+), 10 deletions(-)

diff --git a/granule_ingester/conda-requirements.txt 
b/granule_ingester/conda-requirements.txt
index b2af149..fafd6f3 100644
--- a/granule_ingester/conda-requirements.txt
+++ b/granule_ingester/conda-requirements.txt
@@ -6,5 +6,5 @@ xarray
 pyyaml==5.3.1
 requests==2.23.0
 aiohttp==3.6.2
-aio-pika
+aio-pika==6.6.1
 tenacity
diff --git a/granule_ingester/docker/entrypoint.sh 
b/granule_ingester/docker/entrypoint.sh
index 3a1cb9b..2b6174a 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -7,5 +7,7 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
   $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo 
--cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
   $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
-  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo 
--solr_host_and_port=$SOLR_HOST_AND_PORT)
+  $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo 
--cassandra_username=$CASSANDRA_USERNAME) \
+  $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo 
--cassandra_password=$CASSANDRA_PASSWORD) \
+  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo 
--solr_host_and_port=$SOLR_HOST_AND_PORT) \
   $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo 
--zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py 
b/granule_ingester/granule_ingester/consumer/Consumer.py
index d40b54c..d5f1d97 100644
--- a/granule_ingester/granule_ingester/consumer/Consumer.py
+++ b/granule_ingester/granule_ingester/consumer/Consumer.py
@@ -104,4 +104,5 @@ class Consumer(HealthCheck):
                 raise RabbitMQLostConnectionError("Lost connection to RabbitMQ 
while processing a granule.")
             except Exception as e:
                 await queue_iter.close()
+                await channel.close()
                 raise e
diff --git a/granule_ingester/granule_ingester/main.py 
b/granule_ingester/granule_ingester/main.py
index f602da8..8b8d40f 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -26,8 +26,8 @@ from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.writers import CassandraStore, SolrStore
 
 
-def cassandra_factory(contact_points, port):
-    store = CassandraStore(contact_points, port)
+def cassandra_factory(contact_points, port, username, password):
+    store = CassandraStore(contact_points=contact_points, port=port, 
username=username, password=password)
     store.connect()
     return store
 
@@ -73,6 +73,14 @@ async def main(loop):
                         default=9042,
                         metavar="PORT",
                         help='Cassandra port. (Default: 9042)')
+    parser.add_argument('--cassandra_username',
+                        metavar="USERNAME",
+                        default=None,
+                        help='Cassandra username. Optional.')
+    parser.add_argument('--cassandra_password',
+                        metavar="PASSWORD",
+                        default=None,
+                        help='Cassandra password. Optional.')
     parser.add_argument('--solr_host_and_port',
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
@@ -99,6 +107,8 @@ async def main(loop):
 
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
+    cassandra_username = args.cassandra_username
+    cassandra_password = args.cassandra_password
     solr_host_and_port = args.solr_host_and_port
     zk_host_and_port = args.zk_host_and_port
 
@@ -106,11 +116,18 @@ async def main(loop):
                         rabbitmq_username=args.rabbitmq_username,
                         rabbitmq_password=args.rabbitmq_password,
                         rabbitmq_queue=args.rabbitmq_queue,
-                        data_store_factory=partial(cassandra_factory, 
cassandra_contact_points, cassandra_port),
+                        data_store_factory=partial(cassandra_factory,
+                                                   cassandra_contact_points,
+                                                   cassandra_port,
+                                                   cassandra_username,
+                                                   cassandra_password),
                         metadata_store_factory=partial(solr_factory, 
solr_host_and_port, zk_host_and_port))
     try:
         solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port 
else SolrStore(solr_url=solr_host_and_port)
-        await run_health_checks([CassandraStore(cassandra_contact_points, 
cassandra_port),
+        await run_health_checks([CassandraStore(cassandra_contact_points,
+                                                cassandra_port,
+                                                cassandra_username,
+                                                cassandra_password),
                                  solr_store,
                                  consumer])
         async with consumer:
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py 
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index a667d5e..2181da2 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+import os
 import pickle
 import time
 from multiprocessing import Manager
@@ -99,6 +100,8 @@ class Pipeline:
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
 
+        self._max_concurrency = int(os.getenv('MAX_CONCURRENCY', 16))
+
         # Create a SyncManager so that we can to communicate exceptions from 
the
         # worker processes back to the main process.
         self._manager = Manager()
@@ -161,6 +164,8 @@ class Pipeline:
         return processor_module
 
     async def run(self):
+
+        logger.info(f"Running pipeline with {self._max_concurrency} threads 
per process")
         async with self._granule_loader as (dataset, granule_name):
             start = time.perf_counter()
 
@@ -170,7 +175,9 @@ class Pipeline:
                                       dataset,
                                       self._data_store_factory,
                                       self._metadata_store_factory,
-                                      shared_memory)) as pool:
+                                      shared_memory),
+                            maxtasksperchild=self._max_concurrency,
+                            childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = 
[nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, 
granule_name)]
                 # aiomultiprocess is built on top of the stdlib 
multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py 
b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 6b2cf32..cb5232b 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,6 +18,7 @@ import asyncio
 import logging
 import uuid
 
+from cassandra.auth import PlainTextAuthProvider
 from cassandra.cluster import Cluster, Session, NoHostAvailable
 from cassandra.cqlengine import columns
 from cassandra.cqlengine.models import Model
@@ -39,8 +40,10 @@ class TileModel(Model):
 
 
 class CassandraStore(DataStore):
-    def __init__(self, contact_points=None, port=9042):
+    def __init__(self, contact_points=None, port=9042, username=None, 
password=None):
         self._contact_points = contact_points
+        self._username = username
+        self._password = password
         self._port = port
         self._session = None
 
@@ -53,11 +56,18 @@ class CassandraStore(DataStore):
             raise CassandraFailedHealthCheckError("Cannot connect to 
Cassandra!")
 
     def _get_session(self) -> Session:
+
+        if self._username and self._password:
+            auth_provider = PlainTextAuthProvider(username=self._username, 
password=self._password)
+        else:
+            auth_provider = None
+
         cluster = Cluster(contact_points=self._contact_points,
                           port=self._port,
                           # load_balancing_policy=
                           
reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
-                          default_retry_policy=RetryPolicy())
+                          default_retry_policy=RetryPolicy(),
+                          auth_provider=auth_provider)
         session = cluster.connect()
         session.set_keyspace('nexustiles')
         return session
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py 
b/granule_ingester/granule_ingester/writers/SolrStore.py
index 926a75c..276a988 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -59,7 +59,7 @@ class SolrStore(MetadataStore):
 
     def _get_connection(self) -> pysolr.Solr:
         if self._zk_url:
-            zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+            zk = pysolr.ZooKeeper(f"{self._zk_url}")
             collections = {}
             for c in zk.zk.get_children("collections"):
                 
collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))

Reply via email to