This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new c88ca99 SDAP-485 - Improved writing to doms_data with very large
numbers of points (#273)
c88ca99 is described below
commit c88ca993a28796445ac02f299f1f2a2e0abc2b39
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu Sep 7 13:16:20 2023 -0700
SDAP-485 - Improved writing to doms_data with very large numbers of points
(#273)
* Improved writing to doms_data with very large numbers of points
Previously a single failure would cause the entire write to be retried. Now
only retires the failed points
* SDAP-485 - Changelog
---------
Co-authored-by: rileykk <[email protected]>
---
CHANGELOG.md | 1 +
analysis/webservice/algorithms/doms/ResultsStorage.py | 14 ++++++++++----
2 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0346db7..e8712e8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,6 +21,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- SDAP-466: Matchup now defines secondary `platform` fields with `platform.id`
if it is available and not blank. It then uses `platform.code` and
`platform.type` as fallbacks, then just the value of `platform` if none work.
- SDAP-468: Updated matchup output filename
- SDAP-482: Updated Saildrone in situ endpoint in config file
+- SDAP-485: Improved behavior for retrying failed Cassandra inserts when
saving matchup results.
### Deprecated
### Removed
- SDAP-465: Removed `climatology` directory.
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py
b/analysis/webservice/algorithms/doms/ResultsStorage.py
index a183859..48a89a0 100644
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ b/analysis/webservice/algorithms/doms/ResultsStorage.py
@@ -195,9 +195,12 @@ class ResultsStorage(AbstractResultsContainer):
inserts.extend(self.__prepare_result(execution_id, None, result,
insertStatement))
for i in range(5):
- if not self.__insert_result_batches(inserts, insertStatement):
+ success, failed_entries = self.__insert_result_batches(inserts,
insertStatement)
+
+ if not success:
if i < 4:
self._log.warning('Some write attempts failed; retrying')
+ inserts = failed_entries
sleep(10)
else:
self._log.error('Some write attempts failed; max retries
exceeded')
@@ -213,6 +216,8 @@ class ResultsStorage(AbstractResultsContainer):
n_inserts = len(insert_params)
writing = 0
+ failed = []
+
self._log.info(f'Inserting {n_inserts} matchup entries in JSON format')
for batch in query_batches:
@@ -222,16 +227,17 @@ class ResultsStorage(AbstractResultsContainer):
f'Writing batch of {len(batch)} matchup entries |
({writing}/{n_inserts}) [{writing / n_inserts * 100:7.3f}%]')
for entry in batch:
- futures.append(self._session.execute_async(insertStatement,
entry))
+ futures.append((entry,
self._session.execute_async(insertStatement, entry)))
- for future in futures:
+ for entry, future in futures:
try:
future.result()
except Exception:
move_successful = False
+ failed.append(entry)
self._log.info('Result data write attempt completed')
- return move_successful
+ return move_successful, failed
def __prepare_result(self, execution_id, primaryId, result,
insertStatement):
if 'primary' in result: