This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-455
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 8feb2ba6f669bd23ac5e17583b3d426fe30d8b00
Merge: 8725ff6 5995f84
Author: skorper <[email protected]>
AuthorDate: Tue May 9 17:52:52 2023 -0700

    Merge remote-tracking branch 'origin' into SDAP-455

 CHANGELOG.md                                    |   2 +
 analysis/tests/algorithms_spark/test_matchup.py |  68 +++++++++++++
 analysis/webservice/algorithms_spark/Matchup.py |  28 ++++--
 analysis/webservice/apidocs/openapi.yml         |  13 +++
 docs/granule-download.sh                        | 127 ++++++++++++++++++++++++
 5 files changed, 232 insertions(+), 6 deletions(-)

diff --cc CHANGELOG.md
index 5e524f2,841ed00..4e03bb4
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@@ -8,7 -8,7 +8,8 @@@ and this project adheres to [Semantic V
  ### Added
  - Deletebyquery: Parameter to set the number of rows to fetch from Solr. 
Speeds up time to gather tiles to delete; especially when there is a lot of 
them.
  - Added Saildrone's `baja_2018` insitu dataset.
+ - SDAP-454: Added new query parameter `prioritizeDistance` to matchup 
algorithm
 +- SDAP-455: Large job tracking
  ### Changed
  - SDAP-443:
    - Replacing DOMS terminology with CDMS terminology:
diff --cc analysis/webservice/algorithms_spark/Matchup.py
index fd998e6,6cd4676..bc70e14
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@@ -222,75 -227,44 +232,75 @@@ class Matchup(NexusCalcSparkTornadoHand
          return bounding_polygon, primary_ds_name, secondary_ds_names, 
parameter_s, \
                 start_time, start_seconds_from_epoch, end_time, 
end_seconds_from_epoch, \
                 depth_min, depth_max, time_tolerance, radius_tolerance, \
-                platforms, match_once, result_size_limit
+                platforms, match_once, result_size_limit, prioritize_distance
  
 -    def calc(self, request, **args):
 +    def async_calc(self, execution_id, tile_ids, bounding_polygon, 
primary_ds_name,
 +                   secondary_ds_names, parameter_s, start_time, end_time, 
depth_min,
 +                   depth_max, time_tolerance, radius_tolerance, platforms, 
match_once,
 +                   result_size_limit, start):
 +        # Call spark_matchup
 +        self.log.debug("Calling Spark Driver")
 +
 +        try:
 +            self._sc.setJobGroup(execution_id, execution_id)
 +            spark_result = spark_matchup_driver(
 +                tile_ids, wkt.dumps(bounding_polygon),
 +                primary_ds_name,
 +                secondary_ds_names,
 +                parameter_s,
 +                depth_min,
 +                depth_max, time_tolerance,
 +                radius_tolerance,
 +                platforms,
 +                match_once,
 +                self.tile_service_factory,
 +                sc=self._sc
 +            )
 +        except Exception as error:
 +            self.log.exception(error)
 +            end = datetime.utcnow()
 +            with ResultsStorage(self.config) as storage:
 +                storage.updateExecution(
 +                    uuid.UUID(execution_id),
 +                    completeTime=end,
 +                    status=ExecutionStatus.FAILED.value,
 +                    message=error,
 +                    stats=None,
 +                    results=None
 +                )
 +            return
 +
 +        self.log.debug("Building and saving results")
 +        end = datetime.utcnow()
 +
 +        total_keys = len(list(spark_result.keys()))
 +        total_values = sum(len(v) for v in spark_result.values())
 +        details = {
 +            "timeToComplete": int((end - start).total_seconds()),
 +            "numSecondaryMatched": total_values,
 +            "numPrimaryMatched": total_keys
 +        }
 +
 +        matches = Matchup.convert_to_matches(spark_result)
 +
 +        with ResultsStorage(self.config) as storage:
 +            storage.updateExecution(
 +                uuid.UUID(execution_id),
 +                completeTime=end,
 +                status=ExecutionStatus.SUCCESS.value,
 +                message=None,
 +                stats=details,
 +                results=matches
 +            )
 +
 +    def calc(self, request, tornado_io_loop, **args):
          start = datetime.utcnow()
          # TODO Assuming Satellite primary
          bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \
          start_time, start_seconds_from_epoch, end_time, 
end_seconds_from_epoch, \
          depth_min, depth_max, time_tolerance, radius_tolerance, \
-         platforms, match_once, result_size_limit = 
self.parse_arguments(request)
+         platforms, match_once, result_size_limit, prioritize_distance = 
self.parse_arguments(request)
  
 -        with ResultsStorage(self.config) as resultsStorage:
 -
 -            execution_id = str(resultsStorage.insertExecution(None, start, 
None, None))
 -
 -        self.log.debug("Querying for tiles in search domain")
 -        # Get tile ids in box
 -        tile_ids = [tile.tile_id for tile in
 -                    
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, 
primary_ds_name,
 -                                                             
start_seconds_from_epoch, end_seconds_from_epoch,
 -                                                             
fetch_data=False, fl='id',
 -                                                             
sort=['tile_min_time_dt asc', 'tile_min_lon asc',
 -                                                                   
'tile_min_lat asc'], rows=5000)]
 -
 -        self.log.info('Found %s tile_ids', len(tile_ids))
 -        # Call spark_matchup
 -        self.log.debug("Calling Spark Driver")
 -        try:
 -            spark_result = spark_matchup_driver(tile_ids, 
wkt.dumps(bounding_polygon), primary_ds_name,
 -                                                secondary_ds_names, 
parameter_s, depth_min, depth_max, time_tolerance,
 -                                                radius_tolerance, platforms, 
match_once, self.tile_service_factory,
 -                                                prioritize_distance, 
sc=self._sc)
 -        except Exception as e:
 -            self.log.exception(e)
 -            raise NexusProcessingException(reason="An unknown error occurred 
while computing matches", code=500)
 -
 -        end = datetime.utcnow()
 -
 -        self.log.debug("Building and saving results")
          args = {
              "primary": primary_ds_name,
              "matchup": secondary_ds_names,

Reply via email to