This is an automated email from the ASF dual-hosted git repository.

rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new c88ca99  SDAP-485 - Improved writing to doms_data with very large 
numbers of points (#273)
c88ca99 is described below

commit c88ca993a28796445ac02f299f1f2a2e0abc2b39
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu Sep 7 13:16:20 2023 -0700

    SDAP-485 - Improved writing to doms_data with very large numbers of points 
(#273)
    
    * Improved writing to doms_data with very large numbers of points
    
    Previously a single failure would cause the entire write to be retried. Now 
only retires the failed points
    
    * SDAP-485 - Changelog
    
    ---------
    
    Co-authored-by: rileykk <[email protected]>
---
 CHANGELOG.md                                          |  1 +
 analysis/webservice/algorithms/doms/ResultsStorage.py | 14 ++++++++++----
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0346db7..e8712e8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,6 +21,7 @@ and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0
 - SDAP-466: Matchup now defines secondary `platform` fields with `platform.id` 
if it is available and not blank. It then uses `platform.code` and 
`platform.type` as fallbacks, then just the value of `platform` if none work.
 - SDAP-468: Updated matchup output filename
 - SDAP-482: Updated Saildrone in situ endpoint in config file
+- SDAP-485: Improved behavior for retrying failed Cassandra inserts when 
saving matchup results.
 ### Deprecated
 ### Removed
 - SDAP-465: Removed `climatology` directory. 
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py 
b/analysis/webservice/algorithms/doms/ResultsStorage.py
index a183859..48a89a0 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -195,9 +195,12 @@ class ResultsStorage(AbstractResultsContainer):
             inserts.extend(self.__prepare_result(execution_id, None, result, 
insertStatement))
 
         for i in range(5):
-            if not self.__insert_result_batches(inserts, insertStatement):
+            success, failed_entries = self.__insert_result_batches(inserts, 
insertStatement)
+
+            if not success:
                 if i < 4:
                     self._log.warning('Some write attempts failed; retrying')
+                    inserts = failed_entries
                     sleep(10)
                 else:
                     self._log.error('Some write attempts failed; max retries 
exceeded')
@@ -213,6 +216,8 @@ class ResultsStorage(AbstractResultsContainer):
         n_inserts = len(insert_params)
         writing = 0
 
+        failed = []
+
         self._log.info(f'Inserting {n_inserts} matchup entries in JSON format')
 
         for batch in query_batches:
@@ -222,16 +227,17 @@ class ResultsStorage(AbstractResultsContainer):
                 f'Writing batch of {len(batch)} matchup entries | 
({writing}/{n_inserts}) [{writing / n_inserts * 100:7.3f}%]')
 
             for entry in batch:
-                futures.append(self._session.execute_async(insertStatement, 
entry))
+                futures.append((entry, 
self._session.execute_async(insertStatement, entry)))
 
-            for future in futures:
+            for entry, future in futures:
                 try:
                     future.result()
                 except Exception:
                     move_successful = False
+                    failed.append(entry)
 
         self._log.info('Result data write attempt completed')
-        return move_successful
+        return move_successful, failed
 
     def __prepare_result(self, execution_id, primaryId, result, 
insertStatement):
         if 'primary' in result:

Reply via email to