This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch rabbitmq-fix
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 6ca29339fa639950023382bec513781151d5eb1e
Author: Eamon Ford <[email protected]>
AuthorDate: Thu Jul 16 18:24:00 2020 -0700

    use pysolr
---
 granule_ingester/docker/entrypoint.sh              |  1 +
 .../granule_ingester/exceptions/Exceptions.py      |  3 +
 .../granule_ingester/exceptions/__init__.py        |  1 +
 granule_ingester/granule_ingester/main.py          | 19 ++---
 .../granule_ingester/pipeline/Pipeline.py          |  4 +-
 .../granule_ingester/writers/SolrStore.py          | 81 ++++++++++++++--------
 granule_ingester/requirements.txt                  |  4 +-
 7 files changed, 74 insertions(+), 39 deletions(-)

diff --git a/granule_ingester/docker/entrypoint.sh 
b/granule_ingester/docker/entrypoint.sh
index e6f7262..3a1cb9b 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -8,3 +8,4 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo 
--cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
   $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
   $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo 
--solr_host_and_port=$SOLR_HOST_AND_PORT)
+  $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo 
--zk_host_and_port=$ZK_HOST_AND_PORT)
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py 
b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index ca60608..c648b99 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -21,6 +21,9 @@ class RabbitMQLostConnectionError(LostConnectionError):
 class CassandraLostConnectionError(LostConnectionError):
     pass
 
+class SolrLostConnectionError(LostConnectionError):
+    pass
+
 
 class FailedHealthCheckError(Exception):
     pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py 
b/granule_ingester/granule_ingester/exceptions/__init__.py
index 31cc5b8..ea0969f 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -7,4 +7,5 @@ from .Exceptions import PipelineRunningError
 from .Exceptions import RabbitMQFailedHealthCheckError
 from .Exceptions import RabbitMQLostConnectionError
 from .Exceptions import SolrFailedHealthCheckError
+from .Exceptions import SolrLostConnectionError
 from .Exceptions import TileProcessingError
diff --git a/granule_ingester/granule_ingester/main.py 
b/granule_ingester/granule_ingester/main.py
index 45877c2..f602da8 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -15,16 +15,15 @@
 
 import argparse
 import asyncio
-from granule_ingester.exceptions import FailedHealthCheckError, 
LostConnectionError
 import logging
+import sys
 from functools import partial
 from typing import List
 
 from granule_ingester.consumer import Consumer
+from granule_ingester.exceptions import FailedHealthCheckError, 
LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
-from granule_ingester.writers import CassandraStore
-from granule_ingester.writers import SolrStore
-import sys
+from granule_ingester.writers import CassandraStore, SolrStore
 
 
 def cassandra_factory(contact_points, port):
@@ -33,8 +32,8 @@ def cassandra_factory(contact_points, port):
     return store
 
 
-def solr_factory(solr_host_and_port):
-    store = SolrStore(solr_host_and_port)
+def solr_factory(solr_host_and_port, zk_host_and_port):
+    store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else 
SolrStore(solr_url=solr_host_and_port)
     store.connect()
     return store
 
@@ -78,6 +77,8 @@ async def main(loop):
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: 
http://localhost:8983)')
+    parser.add_argument('--zk_host_and_port',
+                        metavar="HOST:PORT")
     parser.add_argument('-v',
                         '--verbose',
                         action='store_true',
@@ -99,16 +100,18 @@ async def main(loop):
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
     solr_host_and_port = args.solr_host_and_port
+    zk_host_and_port = args.zk_host_and_port
 
     consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
                         rabbitmq_username=args.rabbitmq_username,
                         rabbitmq_password=args.rabbitmq_password,
                         rabbitmq_queue=args.rabbitmq_queue,
                         data_store_factory=partial(cassandra_factory, 
cassandra_contact_points, cassandra_port),
-                        metadata_store_factory=partial(solr_factory, 
solr_host_and_port))
+                        metadata_store_factory=partial(solr_factory, 
solr_host_and_port, zk_host_and_port))
     try:
+        solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port 
else SolrStore(solr_url=solr_host_and_port)
         await run_health_checks([CassandraStore(cassandra_contact_points, 
cassandra_port),
-                                 SolrStore(solr_host_and_port),
+                                 solr_store,
                                  consumer])
         async with consumer:
             logger.info("All external dependencies have passed the health 
checks. Now listening to message queue.")
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py 
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index f1aa021..a667d5e 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -27,7 +27,7 @@ from nexusproto import DataTile_pb2 as nexusproto
 from tblib import pickling_support
 from yaml.scanner import ScannerError
 
-from granule_ingester.exceptions import PipelineBuildingError, 
LostConnectionError
+from granule_ingester.exceptions import PipelineBuildingError
 from granule_ingester.granule_loaders import GranuleLoader
 from granule_ingester.pipeline.Modules import modules as 
processor_module_mappings
 from granule_ingester.processors.TileProcessor import TileProcessor
@@ -180,6 +180,8 @@ class Pipeline:
                         await pool.map(_process_tile_in_worker, chunk)
                     except ProxyException:
                         pool.terminate()
+                        # Give the shared memory manager some time to write 
the exception
+                        # await asyncio.sleep(1)
                         raise pickle.loads(shared_memory.error)
 
         end = time.perf_counter()
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py 
b/granule_ingester/granule_ingester/writers/SolrStore.py
index 6baad28..926a75c 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -13,64 +13,87 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from asyncio import AbstractEventLoop
-
+import asyncio
+import functools
+import json
 import logging
+from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
 from typing import Dict
 
-import aiohttp
+import pysolr
+from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.exceptions import NoNodeError
 from nexusproto.DataTile_pb2 import *
-from tenacity import *
 
+from granule_ingester.exceptions import SolrFailedHealthCheckError, 
SolrLostConnectionError
 from granule_ingester.writers.MetadataStore import MetadataStore
-from granule_ingester.exceptions import SolrFailedHealthCheckError
+
 logger = logging.getLogger(__name__)
 
 
+def run_in_executor(f):
+    @functools.wraps(f)
+    def inner(*args, **kwargs):
+        loop = asyncio.get_running_loop()
+        return loop.run_in_executor(None, lambda: f(*args, **kwargs))
+
+    return inner
+
+
 class SolrStore(MetadataStore):
-    def __init__(self, host_and_port='http://localhost:8983'):
+    def __init__(self, solr_url=None, zk_url=None):
         super().__init__()
 
         self.TABLE_NAME = "sea_surface_temp"
         self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
 
-        self._host_and_port = host_and_port
+        self._solr_url = solr_url
+        self._zk_url = zk_url
         self.geo_precision: int = 3
-        self.collection: str = "nexustiles"
+        self._collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
         self.log.setLevel(logging.DEBUG)
-        self._session = None
+        self._solr = None
+
+    def _get_connection(self) -> pysolr.Solr:
+        if self._zk_url:
+            zk = pysolr.ZooKeeper(f"{self._zk_url}/solr")
+            collections = {}
+            for c in zk.zk.get_children("collections"):
+                
collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
+            zk.collections = collections
+            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+        elif self._solr_url:
+            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', 
always_commit=True)
+        else:
+            raise RuntimeError("You must provide either solr_host or 
zookeeper_host.")
 
     def connect(self, loop: AbstractEventLoop = None):
-        self._session = aiohttp.ClientSession(loop=loop)
+        self._solr = self._get_connection()
 
     async def health_check(self):
         try:
-            async with aiohttp.ClientSession() as session:
-                response = await 
session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, 
self.collection))
-                if response.status == 200:
-                    return True
-                else:
-                    logger.error("Solr health check returned status 
{}.".format(response.status))
-        except aiohttp.ClientConnectionError as e:
-            raise SolrFailedHealthCheckError("Cannot connect to to Solr!")
-
-        return False
+            connection = self._get_connection()
+            connection.ping()
+        except pysolr.SolrError:
+            raise SolrFailedHealthCheckError("Cannot connect to Solr!")
+        except NoNodeError:
+            raise SolrFailedHealthCheckError("Connected to Zookeeper but 
cannot connect to Solr!")
+        except KazooTimeoutError:
+            raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
 
     async def save_metadata(self, nexus_tile: NexusTile) -> None:
         solr_doc = self._build_solr_doc(nexus_tile)
+        await self._save_document(solr_doc)
 
-        await self._save_document(self.collection, solr_doc)
-
-    @retry(stop=stop_after_attempt(5))
-    async def _save_document(self, collection: str, doc: dict):
-        url = 
'{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, 
collection)
-        response = await self._session.post(url, json=doc)
-        if response.status < 200 or response.status >= 400:
-            raise RuntimeError("Saving data to Solr failed with HTTP status 
code {}".format(response.status))
+    @run_in_executor
+    def _save_document(self, doc: dict):
+        try:
+            self._solr.add([doc])
+        except pysolr.SolrError:
+            raise SolrLostConnectionError("Lost connection to Solr, and cannot 
save tiles.")
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:
         summary: TileSummary = tile.summary
diff --git a/granule_ingester/requirements.txt 
b/granule_ingester/requirements.txt
index 0479f99..9b06860 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -1,4 +1,6 @@
 cassandra-driver==3.23.0
 aiomultiprocess==0.7.0
 aioboto3
-tblib==1.6.0
\ No newline at end of file
+tblib==1.6.0
+pysolr==3.9.0
+kazoo==2.8.0
\ No newline at end of file

Reply via email to