This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch rabbitmq-fix in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 4b81e13601a94ff9a943bbf4c9bd5e644c651272 Author: Eamon Ford <[email protected]> AuthorDate: Thu Jul 16 18:24:00 2020 -0700 use pysolr --- granule_ingester/docker/entrypoint.sh | 1 + granule_ingester/granule_ingester/main.py | 14 ++-- .../granule_ingester/pipeline/Pipeline.py | 4 +- .../granule_ingester/writers/SolrStore.py | 81 ++++++++++++++-------- granule_ingester/requirements.txt | 4 +- 5 files changed, 67 insertions(+), 37 deletions(-) diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index b703ee3..b8369c6 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -10,3 +10,4 @@ python /sdap/granule_ingester/main.py \ $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo --cassandra_username=$CASSANDRA_USERNAME) \ $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra_password=$CASSANDRA_PASSWORD) \ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) + $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 751da19..87a6d5a 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -15,16 +15,15 @@ import argparse import asyncio -from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError import logging +import sys from functools import partial from typing import List from granule_ingester.consumer import Consumer +from granule_ingester.exceptions import FailedHealthCheckError, LostConnectionError from granule_ingester.healthcheck import HealthCheck -from granule_ingester.writers import CassandraStore -from granule_ingester.writers import SolrStore -import sys +from granule_ingester.writers import CassandraStore, SolrStore def cassandra_factory(contact_points, port, username, password): @@ -33,8 +32,8 @@ def cassandra_factory(contact_points, port, username, password): return store -def solr_factory(solr_host_and_port): - store = SolrStore(solr_host_and_port) +def solr_factory(solr_host_and_port, zk_host_and_port): + store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port else SolrStore(solr_url=solr_host_and_port) store.connect() return store @@ -86,6 +85,8 @@ async def main(loop): default='http://localhost:8983', metavar='HOST:PORT', help='Solr host and port. (Default: http://localhost:8983)') + parser.add_argument('--zk_host_and_port', + metavar="HOST:PORT") parser.add_argument('-v', '--verbose', action='store_true', @@ -109,6 +110,7 @@ async def main(loop): cassandra_contact_points = args.cassandra_contact_points cassandra_port = args.cassandra_port solr_host_and_port = args.solr_host_and_port + zk_host_and_port = args.zk_host_and_port consumer = Consumer(rabbitmq_host=args.rabbitmq_host, rabbitmq_username=args.rabbitmq_username, diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index f1aa021..a667d5e 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -27,7 +27,7 @@ from nexusproto import DataTile_pb2 as nexusproto from tblib import pickling_support from yaml.scanner import ScannerError -from granule_ingester.exceptions import PipelineBuildingError, LostConnectionError +from granule_ingester.exceptions import PipelineBuildingError from granule_ingester.granule_loaders import GranuleLoader from granule_ingester.pipeline.Modules import modules as processor_module_mappings from granule_ingester.processors.TileProcessor import TileProcessor @@ -180,6 +180,8 @@ class Pipeline: await pool.map(_process_tile_in_worker, chunk) except ProxyException: pool.terminate() + # Give the shared memory manager some time to write the exception + # await asyncio.sleep(1) raise pickle.loads(shared_memory.error) end = time.perf_counter() diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 6baad28..926a75c 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -13,64 +13,87 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from asyncio import AbstractEventLoop - +import asyncio +import functools +import json import logging +from asyncio import AbstractEventLoop from datetime import datetime from pathlib import Path from typing import Dict -import aiohttp +import pysolr +from kazoo.handlers.threading import KazooTimeoutError +from kazoo.exceptions import NoNodeError from nexusproto.DataTile_pb2 import * -from tenacity import * +from granule_ingester.exceptions import SolrFailedHealthCheckError, SolrLostConnectionError from granule_ingester.writers.MetadataStore import MetadataStore -from granule_ingester.exceptions import SolrFailedHealthCheckError + logger = logging.getLogger(__name__) +def run_in_executor(f): + @functools.wraps(f) + def inner(*args, **kwargs): + loop = asyncio.get_running_loop() + return loop.run_in_executor(None, lambda: f(*args, **kwargs)) + + return inner + + class SolrStore(MetadataStore): - def __init__(self, host_and_port='http://localhost:8983'): + def __init__(self, solr_url=None, zk_url=None): super().__init__() self.TABLE_NAME = "sea_surface_temp" self.iso: str = '%Y-%m-%dT%H:%M:%SZ' - self._host_and_port = host_and_port + self._solr_url = solr_url + self._zk_url = zk_url self.geo_precision: int = 3 - self.collection: str = "nexustiles" + self._collection: str = "nexustiles" self.log: logging.Logger = logging.getLogger(__name__) self.log.setLevel(logging.DEBUG) - self._session = None + self._solr = None + + def _get_connection(self) -> pysolr.Solr: + if self._zk_url: + zk = pysolr.ZooKeeper(f"{self._zk_url}/solr") + collections = {} + for c in zk.zk.get_children("collections"): + collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii"))) + zk.collections = collections + return pysolr.SolrCloud(zk, self._collection, always_commit=True) + elif self._solr_url: + return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True) + else: + raise RuntimeError("You must provide either solr_host or zookeeper_host.") def connect(self, loop: AbstractEventLoop = None): - self._session = aiohttp.ClientSession(loop=loop) + self._solr = self._get_connection() async def health_check(self): try: - async with aiohttp.ClientSession() as session: - response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection)) - if response.status == 200: - return True - else: - logger.error("Solr health check returned status {}.".format(response.status)) - except aiohttp.ClientConnectionError as e: - raise SolrFailedHealthCheckError("Cannot connect to to Solr!") - - return False + connection = self._get_connection() + connection.ping() + except pysolr.SolrError: + raise SolrFailedHealthCheckError("Cannot connect to Solr!") + except NoNodeError: + raise SolrFailedHealthCheckError("Connected to Zookeeper but cannot connect to Solr!") + except KazooTimeoutError: + raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!") async def save_metadata(self, nexus_tile: NexusTile) -> None: solr_doc = self._build_solr_doc(nexus_tile) + await self._save_document(solr_doc) - await self._save_document(self.collection, solr_doc) - - @retry(stop=stop_after_attempt(5)) - async def _save_document(self, collection: str, doc: dict): - url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection) - response = await self._session.post(url, json=doc) - if response.status < 200 or response.status >= 400: - raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status)) + @run_in_executor + def _save_document(self, doc: dict): + try: + self._solr.add([doc]) + except pysolr.SolrError: + raise SolrLostConnectionError("Lost connection to Solr, and cannot save tiles.") def _build_solr_doc(self, tile: NexusTile) -> Dict: summary: TileSummary = tile.summary diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt index 0479f99..9b06860 100644 --- a/granule_ingester/requirements.txt +++ b/granule_ingester/requirements.txt @@ -1,4 +1,6 @@ cassandra-driver==3.23.0 aiomultiprocess==0.7.0 aioboto3 -tblib==1.6.0 \ No newline at end of file +tblib==1.6.0 +pysolr==3.9.0 +kazoo==2.8.0 \ No newline at end of file
