This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch support_solr_bitnami
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/support_solr_bitnami by this 
push:
     new daa4e16  other changes with retry policy for ingestion
daa4e16 is described below

commit daa4e162e96569e83caba5e555a46834f0545125
Author: thomas loubrieu <[email protected]>
AuthorDate: Wed Jul 21 14:44:57 2021 -0700

    other changes with retry policy for ingestion
---
 granule_ingester/docker/Dockerfile                 |  2 +-
 .../granule_ingester/pipeline/Pipeline.py          |  6 ++---
 .../granule_ingester/writers/CassandraStore.py     | 21 +++++++++++----
 .../writers/CassandraStoreConnectionRetryPolicy.py | 31 ++++++++++++++++++++++
 .../granule_ingester/writers/SolrStore.py          | 18 +++++++++----
 .../granule_ingester/writers/__init__.py           |  1 +
 6 files changed, 65 insertions(+), 14 deletions(-)

diff --git a/granule_ingester/docker/Dockerfile 
b/granule_ingester/docker/Dockerfile
index 6f9d525..6d7414b 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -23,5 +23,5 @@ RUN pip install boto3==1.16.10
 RUN apk del .build-deps
 
 USER 1001
-
+ENV OPENBLAS_NUM_THREADS=1
 ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py 
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 59f02a0..6c32bf6 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -101,7 +101,7 @@ class Pipeline:
         self._slicer = slicer
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
-        self._max_concurrency = max_concurrency
+        self._max_concurrency: int = max_concurrency
 
         # Create a SyncManager so that we can to communicate exceptions from 
the
         # worker processes back to the main process.
@@ -188,8 +188,8 @@ class Pipeline:
                                       self._data_store_factory,
                                       self._metadata_store_factory,
                                       shared_memory),
-                            maxtasksperchild=self._max_concurrency,
-                            childconcurrency=self._max_concurrency) as pool:
+                            maxtasksperchild=int(self._max_concurrency),
+                            childconcurrency=int(self._max_concurrency)) as 
pool:
                 serialized_tiles = 
[nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, 
granule_name)]
                 # aiomultiprocess is built on top of the stdlib 
multiprocessing library, which has the limitation that
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py 
b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 514d12f..505b56a 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -17,6 +17,7 @@
 import asyncio
 import logging
 import uuid
+import time
 
 from cassandra.auth import PlainTextAuthProvider
 from cassandra.cluster import Cluster, Session, NoHostAvailable
@@ -27,6 +28,7 @@ from nexusproto.DataTile_pb2 import NexusTile, TileData
 
 from granule_ingester.exceptions import CassandraFailedHealthCheckError, 
CassandraLostConnectionError
 from granule_ingester.writers.DataStore import DataStore
+from granule_ingester.writers.CassandraStoreConnectionRetryPolicy import 
CassandraStoreConnectionRetryPolicy
 
 logging.getLogger('cassandra').setLevel(logging.INFO)
 logger = logging.getLogger(__name__)
@@ -67,10 +69,13 @@ class CassandraStore(DataStore):
                           
#load_balancing_policy=DCAwareRoundRobinPolicy("dc1"),
                           protocol_version=4,
                           
reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
-                          default_retry_policy=RetryPolicy(),
+                          
default_retry_policy=CassandraStoreConnectionRetryPolicy(),
                           auth_provider=auth_provider)
+
         session = cluster.connect()
         session.set_keyspace('nexustiles')
+        session.default_timeout = 60
+
         return session
 
     def connect(self):
@@ -80,7 +85,7 @@ class CassandraStore(DataStore):
         if self._session:
             self._session.shutdown()
 
-    async def save_data(self, tile: NexusTile) -> None:
+    async def save_data(self, tile: NexusTile, max_num_try=6, num_try=0) -> 
None:
         try:
             tile_id = uuid.UUID(tile.summary.tile_id)
             serialized_tile_data = TileData.SerializeToString(tile.tile)
@@ -89,12 +94,18 @@ class CassandraStore(DataStore):
             await self._execute_query_async(self._session, prepared_query,
                                             [tile_id, 
bytearray(serialized_tile_data)])
         except Exception as e:
-            logger.error("exception while uploading tile data on cassandra 
%s", e)
-            raise CassandraLostConnectionError(f"Lost connection to Cassandra, 
and cannot save tiles.")
+            if max_num_try >= num_try:
+                time.sleep(2**num_try)
+                logger.warning("exception while uploading tile data on 
cassandra %s, retry once more", e)
+                await self.save_data(tile, max_num_try=max_num_try, 
num_try=num_try+1)
+            else:
+                logger.error("exception while uploading tile data on cassandra 
%s, second attempt", e)
+                raise CassandraLostConnectionError(f"Lost connection to 
Cassandra, and cannot save tiles.")
+
 
     @staticmethod
     async def _execute_query_async(session: Session, query, parameters=None):
-        cassandra_future = session.execute_async(query, parameters)
+        cassandra_future = session.execute_async(query, parameters, 
timeout=6000)
         asyncio_future = asyncio.Future()
         cassandra_future.add_callbacks(asyncio_future.set_result, 
asyncio_future.set_exception)
         return await asyncio_future
diff --git 
a/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py
 
b/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py
new file mode 100644
index 0000000..4318dc2
--- /dev/null
+++ 
b/granule_ingester/granule_ingester/writers/CassandraStoreConnectionRetryPolicy.py
@@ -0,0 +1,31 @@
+import logging
+from cassandra.policies import RetryPolicy
+from cassandra import WriteType as WT
+
+WriteType = WT
+
+logging.getLogger('cassandra').setLevel(logging.DEBUG)
+logger = logging.getLogger(__name__)
+
+
+class CassandraStoreConnectionRetryPolicy(RetryPolicy):
+
+     def on_write_timeout(self, query, consistency, write_type,
+                          required_responses, received_responses, retry_num):
+        """
+        By default, failed write operations will retried at most once, and
+        they will only be retried if the `write_type` was
+        :attr:`~.WriteType.BATCH_LOG or SIMPLE`.
+        """
+        logger.debug("Write timeout policy applied num retry %i, write_type 
%i", retry_num, write_type)
+        if retry_num != 0:
+            return self.RETHROW, None
+        elif write_type == WriteType.BATCH_LOG or write_type == 
WriteType.SIMPLE:
+            return self.RETRY, consistency
+        else:
+            return self.RETHROW, None
+
+
+
+
+
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py 
b/granule_ingester/granule_ingester/writers/SolrStore.py
index 42ca066..3da6370 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -17,6 +17,7 @@ import asyncio
 import functools
 import json
 import logging
+import time
 from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
@@ -57,10 +58,10 @@ class SolrStore(MetadataStore):
         """
 
         try:
-            logger.info("getting solr configuration from zookeeper, node 
'%s'", parent_nodes[0])
+            logger.debug("getting solr configuration from zookeeper, node 
'%s'", parent_nodes[0])
             return parent_nodes[0], zk.zk.get_children(parent_nodes[0])
         except NoNodeError:
-            logger.info("solr configuration not found in node '%s'", 
parent_nodes[0])
+            logger.debug("solr configuration not found in node '%s'", 
parent_nodes[0])
             if len(parent_nodes)>1:
                 return self._get_collections(zk, parent_nodes[1:])
             else:
@@ -111,11 +112,18 @@ class SolrStore(MetadataStore):
         await self._save_document(solr_doc)
 
     @run_in_executor
-    def _save_document(self, doc: dict):
+    def _save_document(self, doc: dict, max_num_try=6, num_try=0):
         try:
             self._solr.add([doc])
-        except pysolr.SolrError:
-            raise SolrLostConnectionError("Lost connection to Solr, and cannot 
save tiles.")
+        except pysolr.SolrError as e:
+            if max_num_try >= num_try :
+                time.sleep(2**num_try)
+                logger.warning("Lost connection to Solr, %s, retry once more", 
e)
+                self._save_document(doc,
+                                    max_num_try=max_num_try,
+                                    num_try=num_try+1)
+            else:
+                raise SolrLostConnectionError("Lost connection to Solr, and 
cannot save tiles.")
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:
         summary: TileSummary = tile.summary
diff --git a/granule_ingester/granule_ingester/writers/__init__.py 
b/granule_ingester/granule_ingester/writers/__init__.py
index 9323d8c..c9f30e0 100644
--- a/granule_ingester/granule_ingester/writers/__init__.py
+++ b/granule_ingester/granule_ingester/writers/__init__.py
@@ -1,4 +1,5 @@
 from granule_ingester.writers.DataStore import DataStore
 from granule_ingester.writers.MetadataStore import MetadataStore
 from granule_ingester.writers.SolrStore import SolrStore
+from granule_ingester.writers.CassandraStoreConnectionRetryPolicy import 
CassandraStoreConnectionRetryPolicy
 from granule_ingester.writers.CassandraStore import CassandraStore

Reply via email to