This is an automated email from the ASF dual-hosted git repository. rkk pushed a commit to branch SDAP-512 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 835123f88af734266e765b2b2dda1266f8a97a73 Author: rileykk <[email protected]> AuthorDate: Wed Mar 6 09:41:52 2024 -0800 Explicitly close Solr & ZK connections --- .../granule_ingester/pipeline/Pipeline.py | 6 +++++- .../granule_ingester/writers/MetadataStore.py | 10 +++++++++ .../granule_ingester/writers/SolrStore.py | 25 ++++++++++++++++------ 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index 9ebb529..541700f 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -257,7 +257,11 @@ class Pipeline: logger.info(f"Now writing generated tiles...") await self._data_store_factory().save_batch(results) - await self._metadata_store_factory().save_batch(results) + + metadata_store = self._metadata_store_factory() + await metadata_store.save_batch(results) + + metadata_store.close() end = time.perf_counter() logger.info("Pipeline finished in {} seconds".format(end - start)) diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py index 8a97566..7296075 100644 --- a/granule_ingester/granule_ingester/writers/MetadataStore.py +++ b/granule_ingester/granule_ingester/writers/MetadataStore.py @@ -18,6 +18,8 @@ from abc import ABC, abstractmethod from nexusproto import DataTile_pb2 as nexusproto from granule_ingester.healthcheck import HealthCheck +from asyncio import AbstractEventLoop + from typing import List @@ -31,3 +33,11 @@ class MetadataStore(HealthCheck, ABC): def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None: pass + @abstractmethod + def connect(self, loop: AbstractEventLoop = None) -> None: + pass + + @abstractmethod + def close(self) -> None: + pass + diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 2dac038..963e4bc 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -21,7 +21,7 @@ import logging from asyncio import AbstractEventLoop from datetime import datetime from pathlib import Path -from typing import Dict, List, Union +from typing import Dict, List, Union, Tuple, Optional import pysolr from kazoo.exceptions import NoNodeError @@ -51,7 +51,8 @@ class SolrStore(MetadataStore): self.geo_precision: int = 3 self._collection: str = "nexustiles" self.log: logging.Logger = logging.getLogger(__name__) - self._solr = None + self._solr: Optional[pysolr.Solr] = None + self._zk: Optional[pysolr.ZooKeeper] = None def _get_collections(self, zk, parent_nodes): """ @@ -85,23 +86,33 @@ class SolrStore(MetadataStore): collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8"))) zk.collections = collections - def _get_connection(self) -> pysolr.Solr: + def _get_connection(self) -> Tuple[pysolr.Solr, Union[pysolr.ZooKeeper, None]]: if self._zk_url: zk = pysolr.ZooKeeper(f"{self._zk_url}") self._set_solr_status(zk) - return pysolr.SolrCloud(zk, self._collection, always_commit=True) + return pysolr.SolrCloud(zk, self._collection, always_commit=True), zk elif self._solr_url: - return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True) + return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True), None else: raise RuntimeError("You must provide either solr_host or zookeeper_host.") def connect(self, loop: AbstractEventLoop = None): - self._solr = self._get_connection() + self._solr, self._zk = self._get_connection() + + def close(self): + if self._solr is not None: + self._solr.get_session().close() + + if self._zk is not None: + self._zk.zk.stop() + self._zk.zk.close() async def health_check(self): try: - connection = self._get_connection() + connection, _ = self._get_connection() connection.ping() + + self.close() except pysolr.SolrError: raise SolrFailedHealthCheckError("Cannot connect to Solr!") except NoNodeError:
