This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/sdap-ingester.git
The following commit(s) were added to refs/heads/develop by this push:
new c37c722 SDAP-512 - Have GI close ZK/Solr/Cassandra connections when
done with writing to Solr (#97)
c37c722 is described below
commit c37c722b6bb2e72336403686069dae754052f457
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu May 9 09:25:32 2024 -0700
SDAP-512 - Have GI close ZK/Solr/Cassandra connections when done with
writing to Solr (#97)
* Explicitly close Solr & ZK connections
* Try just using destructor instead of explicit close() calls
* Changelog
* Removed commented out code
* Also close Cassandra connections after write
* Update changelog
---------
Co-authored-by: rileykk <[email protected]>
---
CHANGELOG.md | 1 +
.../granule_ingester/writers/CassandraStore.py | 14 +++++++++---
.../granule_ingester/writers/DataStore.py | 11 ++++++++++
.../granule_ingester/writers/MetadataStore.py | 13 +++++++++++
.../granule_ingester/writers/SolrStore.py | 25 ++++++++++++++++------
5 files changed, 54 insertions(+), 10 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e8340ad..ced6fca 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- SDAP-501: Updated dependencies to remove `chardet`
### Fixed
- SDAP-488: Workaround to build issue on Apple Silicon (M1/M2). GI image build
installs nexusproto through PyPI instead of building from source. A build arg
`BUILD_NEXUSPROTO` was defined to allow building from source if desired/
+- SDAP-512: Fixed Granule Ingester not closing connections to
Zookeeper/Solr/Cassandra, eventually exhausting network resources and requiring
a restart
### Security
## [1.1.0] - 2023-04-26
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py
b/granule_ingester/granule_ingester/writers/CassandraStore.py
index ba2fd0d..297854f 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -86,9 +86,17 @@ class CassandraStore(DataStore):
def connect(self):
self._session = self._get_session()
- def __del__(self):
- if self._session:
- self._session.shutdown()
+ def close(self):
+ session: Session = self._session
+ if session is not None:
+ cluster = session.cluster
+
+ session.shutdown()
+ cluster.shutdown()
+
+ del cluster, session
+
+ self._session = None
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
async def save_data(self, tile: NexusTile) -> None:
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py
b/granule_ingester/granule_ingester/writers/DataStore.py
index 7ed3d6b..a5e39ab 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -23,6 +23,17 @@ from typing import List
class DataStore(HealthCheck, ABC):
+ @abstractmethod
+ def connect(self):
+ ...
+
+ @abstractmethod
+ def close(self):
+ ...
+
+ def __del__(self):
+ self.close()
+
@abstractmethod
def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py
b/granule_ingester/granule_ingester/writers/MetadataStore.py
index 8a97566..e1e317b 100644
--- a/granule_ingester/granule_ingester/writers/MetadataStore.py
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -18,6 +18,8 @@ from abc import ABC, abstractmethod
from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.healthcheck import HealthCheck
+from asyncio import AbstractEventLoop
+
from typing import List
@@ -31,3 +33,14 @@ class MetadataStore(HealthCheck, ABC):
def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None:
pass
+ @abstractmethod
+ def connect(self, loop: AbstractEventLoop = None) -> None:
+ pass
+
+ @abstractmethod
+ def close(self) -> None:
+ pass
+
+ def __del__(self):
+ self.close()
+
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py
b/granule_ingester/granule_ingester/writers/SolrStore.py
index 2dac038..963e4bc 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,7 +21,7 @@ import logging
from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
-from typing import Dict, List, Union
+from typing import Dict, List, Union, Tuple, Optional
import pysolr
from kazoo.exceptions import NoNodeError
@@ -51,7 +51,8 @@ class SolrStore(MetadataStore):
self.geo_precision: int = 3
self._collection: str = "nexustiles"
self.log: logging.Logger = logging.getLogger(__name__)
- self._solr = None
+ self._solr: Optional[pysolr.Solr] = None
+ self._zk: Optional[pysolr.ZooKeeper] = None
def _get_collections(self, zk, parent_nodes):
"""
@@ -85,23 +86,33 @@ class SolrStore(MetadataStore):
collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8")))
zk.collections = collections
- def _get_connection(self) -> pysolr.Solr:
+ def _get_connection(self) -> Tuple[pysolr.Solr, Union[pysolr.ZooKeeper,
None]]:
if self._zk_url:
zk = pysolr.ZooKeeper(f"{self._zk_url}")
self._set_solr_status(zk)
- return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+ return pysolr.SolrCloud(zk, self._collection, always_commit=True),
zk
elif self._solr_url:
- return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}',
always_commit=True)
+ return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}',
always_commit=True), None
else:
raise RuntimeError("You must provide either solr_host or
zookeeper_host.")
def connect(self, loop: AbstractEventLoop = None):
- self._solr = self._get_connection()
+ self._solr, self._zk = self._get_connection()
+
+ def close(self):
+ if self._solr is not None:
+ self._solr.get_session().close()
+
+ if self._zk is not None:
+ self._zk.zk.stop()
+ self._zk.zk.close()
async def health_check(self):
try:
- connection = self._get_connection()
+ connection, _ = self._get_connection()
connection.ping()
+
+ self.close()
except pysolr.SolrError:
raise SolrFailedHealthCheckError("Cannot connect to Solr!")
except NoNodeError: