This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/sdap-ingester.git


The following commit(s) were added to refs/heads/develop by this push:
     new c37c722  SDAP-512 - Have GI close ZK/Solr/Cassandra connections when 
done with writing to Solr (#97)
c37c722 is described below

commit c37c722b6bb2e72336403686069dae754052f457
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu May 9 09:25:32 2024 -0700

    SDAP-512 - Have GI close ZK/Solr/Cassandra connections when done with 
writing to Solr (#97)
    
    * Explicitly close Solr & ZK connections
    
    * Try just using destructor instead of explicit close() calls
    
    * Changelog
    
    * Removed commented out code
    
    * Also close Cassandra connections after write
    
    * Update changelog
    
    ---------
    
    Co-authored-by: rileykk <[email protected]>
---
 CHANGELOG.md                                       |  1 +
 .../granule_ingester/writers/CassandraStore.py     | 14 +++++++++---
 .../granule_ingester/writers/DataStore.py          | 11 ++++++++++
 .../granule_ingester/writers/MetadataStore.py      | 13 +++++++++++
 .../granule_ingester/writers/SolrStore.py          | 25 ++++++++++++++++------
 5 files changed, 54 insertions(+), 10 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index e8340ad..ced6fca 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,7 @@ and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0
 - SDAP-501: Updated dependencies to remove `chardet`
 ### Fixed
 - SDAP-488: Workaround to build issue on Apple Silicon (M1/M2). GI image build 
installs nexusproto through PyPI instead of building from source. A build arg 
`BUILD_NEXUSPROTO` was defined to allow building from source if desired/
+- SDAP-512: Fixed Granule Ingester not closing connections to 
Zookeeper/Solr/Cassandra, eventually exhausting network resources and requiring 
a restart
 ### Security
 
 ## [1.1.0] - 2023-04-26
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py 
b/granule_ingester/granule_ingester/writers/CassandraStore.py
index ba2fd0d..297854f 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -86,9 +86,17 @@ class CassandraStore(DataStore):
     def connect(self):
         self._session = self._get_session()
 
-    def __del__(self):
-        if self._session:
-            self._session.shutdown()
+    def close(self):
+        session: Session = self._session
+        if session is not None:
+            cluster = session.cluster
+
+            session.shutdown()
+            cluster.shutdown()
+
+            del cluster, session
+
+            self._session = None
 
     @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, 
min=1, max=12))
     async def save_data(self, tile: NexusTile) -> None:
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py 
b/granule_ingester/granule_ingester/writers/DataStore.py
index 7ed3d6b..a5e39ab 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -23,6 +23,17 @@ from typing import List
 
 
 class DataStore(HealthCheck, ABC):
+    @abstractmethod
+    def connect(self):
+        ...
+
+    @abstractmethod
+    def close(self):
+        ...
+
+    def __del__(self):
+        self.close()
+
 
     @abstractmethod
     def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py 
b/granule_ingester/granule_ingester/writers/MetadataStore.py
index 8a97566..e1e317b 100644
--- a/granule_ingester/granule_ingester/writers/MetadataStore.py
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -18,6 +18,8 @@ from abc import ABC, abstractmethod
 from nexusproto import DataTile_pb2 as nexusproto
 
 from granule_ingester.healthcheck import HealthCheck
+from asyncio import AbstractEventLoop
+
 
 from typing import List
 
@@ -31,3 +33,14 @@ class MetadataStore(HealthCheck, ABC):
     def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None:
         pass
 
+    @abstractmethod
+    def connect(self, loop: AbstractEventLoop = None) -> None:
+        pass
+
+    @abstractmethod
+    def close(self) -> None:
+        pass
+
+    def __del__(self):
+        self.close()
+
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py 
b/granule_ingester/granule_ingester/writers/SolrStore.py
index 2dac038..963e4bc 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -21,7 +21,7 @@ import logging
 from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
-from typing import Dict, List, Union
+from typing import Dict, List, Union, Tuple, Optional
 
 import pysolr
 from kazoo.exceptions import NoNodeError
@@ -51,7 +51,8 @@ class SolrStore(MetadataStore):
         self.geo_precision: int = 3
         self._collection: str = "nexustiles"
         self.log: logging.Logger = logging.getLogger(__name__)
-        self._solr = None
+        self._solr: Optional[pysolr.Solr] = None
+        self._zk: Optional[pysolr.ZooKeeper] = None
 
     def _get_collections(self, zk, parent_nodes):
         """
@@ -85,23 +86,33 @@ class SolrStore(MetadataStore):
             
collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8")))
         zk.collections = collections
 
-    def _get_connection(self) -> pysolr.Solr:
+    def _get_connection(self) -> Tuple[pysolr.Solr, Union[pysolr.ZooKeeper, 
None]]:
         if self._zk_url:
             zk = pysolr.ZooKeeper(f"{self._zk_url}")
             self._set_solr_status(zk)
-            return pysolr.SolrCloud(zk, self._collection, always_commit=True)
+            return pysolr.SolrCloud(zk, self._collection, always_commit=True), 
zk
         elif self._solr_url:
-            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', 
always_commit=True)
+            return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', 
always_commit=True), None
         else:
             raise RuntimeError("You must provide either solr_host or 
zookeeper_host.")
 
     def connect(self, loop: AbstractEventLoop = None):
-        self._solr = self._get_connection()
+        self._solr, self._zk = self._get_connection()
+
+    def close(self):
+        if self._solr is not None:
+            self._solr.get_session().close()
+
+        if self._zk is not None:
+            self._zk.zk.stop()
+            self._zk.zk.close()
 
     async def health_check(self):
         try:
-            connection = self._get_connection()
+            connection, _ = self._get_connection()
             connection.ping()
+
+            self.close()
         except pysolr.SolrError:
             raise SolrFailedHealthCheckError("Cannot connect to Solr!")
         except NoNodeError:

Reply via email to