This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch SDAP-455 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 8feb2ba6f669bd23ac5e17583b3d426fe30d8b00 Merge: 8725ff6 5995f84 Author: skorper <[email protected]> AuthorDate: Tue May 9 17:52:52 2023 -0700 Merge remote-tracking branch 'origin' into SDAP-455 CHANGELOG.md | 2 + analysis/tests/algorithms_spark/test_matchup.py | 68 +++++++++++++ analysis/webservice/algorithms_spark/Matchup.py | 28 ++++-- analysis/webservice/apidocs/openapi.yml | 13 +++ docs/granule-download.sh | 127 ++++++++++++++++++++++++ 5 files changed, 232 insertions(+), 6 deletions(-) diff --cc CHANGELOG.md index 5e524f2,841ed00..4e03bb4 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@@ -8,7 -8,7 +8,8 @@@ and this project adheres to [Semantic V ### Added - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them. - Added Saildrone's `baja_2018` insitu dataset. + - SDAP-454: Added new query parameter `prioritizeDistance` to matchup algorithm +- SDAP-455: Large job tracking ### Changed - SDAP-443: - Replacing DOMS terminology with CDMS terminology: diff --cc analysis/webservice/algorithms_spark/Matchup.py index fd998e6,6cd4676..bc70e14 --- a/analysis/webservice/algorithms_spark/Matchup.py +++ b/analysis/webservice/algorithms_spark/Matchup.py @@@ -222,75 -227,44 +232,75 @@@ class Matchup(NexusCalcSparkTornadoHand return bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ depth_min, depth_max, time_tolerance, radius_tolerance, \ - platforms, match_once, result_size_limit + platforms, match_once, result_size_limit, prioritize_distance - def calc(self, request, **args): + def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name, + secondary_ds_names, parameter_s, start_time, end_time, depth_min, + depth_max, time_tolerance, radius_tolerance, platforms, match_once, + result_size_limit, start): + # Call spark_matchup + self.log.debug("Calling Spark Driver") + + try: + self._sc.setJobGroup(execution_id, execution_id) + spark_result = spark_matchup_driver( + tile_ids, wkt.dumps(bounding_polygon), + primary_ds_name, + secondary_ds_names, + parameter_s, + depth_min, + depth_max, time_tolerance, + radius_tolerance, + platforms, + match_once, + self.tile_service_factory, + sc=self._sc + ) + except Exception as error: + self.log.exception(error) + end = datetime.utcnow() + with ResultsStorage(self.config) as storage: + storage.updateExecution( + uuid.UUID(execution_id), + completeTime=end, + status=ExecutionStatus.FAILED.value, + message=error, + stats=None, + results=None + ) + return + + self.log.debug("Building and saving results") + end = datetime.utcnow() + + total_keys = len(list(spark_result.keys())) + total_values = sum(len(v) for v in spark_result.values()) + details = { + "timeToComplete": int((end - start).total_seconds()), + "numSecondaryMatched": total_values, + "numPrimaryMatched": total_keys + } + + matches = Matchup.convert_to_matches(spark_result) + + with ResultsStorage(self.config) as storage: + storage.updateExecution( + uuid.UUID(execution_id), + completeTime=end, + status=ExecutionStatus.SUCCESS.value, + message=None, + stats=details, + results=matches + ) + + def calc(self, request, tornado_io_loop, **args): start = datetime.utcnow() # TODO Assuming Satellite primary bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ depth_min, depth_max, time_tolerance, radius_tolerance, \ - platforms, match_once, result_size_limit = self.parse_arguments(request) + platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request) - with ResultsStorage(self.config) as resultsStorage: - - execution_id = str(resultsStorage.insertExecution(None, start, None, None)) - - self.log.debug("Querying for tiles in search domain") - # Get tile ids in box - tile_ids = [tile.tile_id for tile in - self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name, - start_seconds_from_epoch, end_seconds_from_epoch, - fetch_data=False, fl='id', - sort=['tile_min_time_dt asc', 'tile_min_lon asc', - 'tile_min_lat asc'], rows=5000)] - - self.log.info('Found %s tile_ids', len(tile_ids)) - # Call spark_matchup - self.log.debug("Calling Spark Driver") - try: - spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name, - secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance, - radius_tolerance, platforms, match_once, self.tile_service_factory, - prioritize_distance, sc=self._sc) - except Exception as e: - self.log.exception(e) - raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500) - - end = datetime.utcnow() - - self.log.debug("Building and saving results") args = { "primary": primary_ds_name, "matchup": secondary_ds_names,
