This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch SDAP-454 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 51421a6a47881b07946ecbb041a0c1ce0e9af891 Author: skorper <[email protected]> AuthorDate: Thu Mar 30 14:45:42 2023 -0700 Added new query parameter to matchup algorithm --- CHANGELOG.md | 1 + analysis/tests/algorithms_spark/test_matchup.py | 68 +++++++++++++++++++++++++ analysis/webservice/algorithms_spark/Matchup.py | 31 ++++++++--- analysis/webservice/apidocs/openapi.yml | 13 +++++ 4 files changed, 106 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b03fda5..d76168f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them. +- SDAP-454: Added new query parameter `prioritizeDistance` to matchup algorithm ### Changed - SDAP-443: - Replacing DOMS terminology with CDMS terminology: diff --git a/analysis/tests/algorithms_spark/test_matchup.py b/analysis/tests/algorithms_spark/test_matchup.py index eba1e5f..47d00c3 100644 --- a/analysis/tests/algorithms_spark/test_matchup.py +++ b/analysis/tests/algorithms_spark/test_matchup.py @@ -589,3 +589,71 @@ def test_match_once_keep_duplicates(): for point in secondary_points: assert point.data_id in [secondary_doms_point_1.data_id, secondary_doms_point_2.data_id] assert point.data_id != secondary_doms_point_3.data_id + + +def test_prioritize_distance(): + """ + Ensure that distance is prioritized over time when prioritizeDistance=True, and + that time is prioritized over distance when prioritizeDistance=False. + """ + primary_doms_point = DomsPoint(longitude=1.0, latitude=1.0, time='2017-07-01T00:00:00Z', + depth=None, data_id='primary') + # Close in space, far in time + secondary_doms_point_1 = DomsPoint(longitude=2.0, latitude=2.0, time='2017-07-08T00:00:00Z', + depth=-1, data_id='secondary1') + # Far in space, close in time + secondary_doms_point_2 = DomsPoint(longitude=90.0, latitude=90.0, time='2017-07-01T00:00:01Z', + depth=-1, data_id='secondary2') + + primary_doms_point.data = [] + secondary_doms_point_1.data = [] + secondary_doms_point_2.data = [] + + patch_generators = [ + (primary_doms_point, secondary_doms_point_1), + (primary_doms_point, secondary_doms_point_2), + ] + + spark = SparkSession.builder.appName('nexus-analysis').getOrCreate() + spark_context = spark.sparkContext + + with mock.patch( + 'webservice.algorithms_spark.Matchup.match_satellite_to_insitu', + ) as mock_match_satellite_to_insitu, mock.patch( + 'webservice.algorithms_spark.Matchup.determine_parallelism' + ) as mock_determine_parallelism: + # Mock the actual call to generate a matchup. Hardcode response + # to test this scenario + mock_match_satellite_to_insitu.return_value = patch_generators + mock_determine_parallelism.return_value = 1 + + match_params = { + 'tile_ids': ['test'], + 'bounding_wkt': '', + 'primary_ds_name': '', + 'secondary_ds_names': '', + 'parameter': '', + 'depth_min': 0, + 'depth_max': 0, + 'time_tolerance': 2000000, + 'radius_tolerance': 0, + 'platforms': '', + 'match_once': True, + 'tile_service_factory': lambda x: None, + 'prioritize_distance': True, + 'sc': spark_context + } + + match_result = spark_matchup_driver(**match_params) + assert len(match_result) == 1 + secondary_points = match_result[list(match_result.keys())[0]] + assert len(secondary_points) == 1 + assert secondary_points[0].data_id == secondary_doms_point_1.data_id + + match_params['prioritize_distance'] = False + + match_result = spark_matchup_driver(**match_params) + assert len(match_result) == 1 + secondary_points = match_result[list(match_result.keys())[0]] + assert len(secondary_points) == 1 + assert secondary_points[0].data_id == secondary_doms_point_2.data_id diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py index 1274b64..25251cd 100644 --- a/analysis/webservice/algorithms_spark/Matchup.py +++ b/analysis/webservice/algorithms_spark/Matchup.py @@ -142,6 +142,13 @@ class Matchup(NexusCalcSparkHandler): "If the number of primary matches is greater than this limit, the service will respond with " "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. " "Default: 500" + }, + "prioritizeDistance": { + "name": "Prioritize distance", + "type": "boolean", + "description": "If true, prioritize distance over time when computing matches. If false, prioritize time over " + "distance. This is only relevant if matchOnce=true, because otherwise all matches will be " + "included so long as they fit within the user-provided tolerances. Default is true." } } singleton = True @@ -214,10 +221,13 @@ class Matchup(NexusCalcSparkHandler): start_seconds_from_epoch = int((start_time - EPOCH).total_seconds()) end_seconds_from_epoch = int((end_time - EPOCH).total_seconds()) + prioritize_distance = request.get_boolean_arg("prioritizeDistance", default=True) + + return bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ depth_min, depth_max, time_tolerance, radius_tolerance, \ - platforms, match_once, result_size_limit + platforms, match_once, result_size_limit, prioritize_distance def calc(self, request, **args): start = datetime.utcnow() @@ -225,7 +235,7 @@ class Matchup(NexusCalcSparkHandler): bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ depth_min, depth_max, time_tolerance, radius_tolerance, \ - platforms, match_once, result_size_limit = self.parse_arguments(request) + platforms, match_once, result_size_limit, prioritize_distance = self.parse_arguments(request) with ResultsStorage(self.config) as resultsStorage: @@ -246,7 +256,8 @@ class Matchup(NexusCalcSparkHandler): try: spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name, secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance, - radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc) + radius_tolerance, platforms, match_once, self.tile_service_factory, + prioritize_distance, sc=self._sc) except Exception as e: self.log.exception(e) raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500) @@ -564,7 +575,7 @@ DRIVER_LOCK = Lock() def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_names, parameter, depth_min, depth_max, - time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, sc=None): + time_tolerance, radius_tolerance, platforms, match_once, tile_service_factory, prioritize_distance=True, sc=None): from functools import partial with DRIVER_LOCK: @@ -611,12 +622,14 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n # Method used for calculating the distance between 2 DomsPoints from pyproj import Geod - def dist(primary, matchup): + def dist(primary, matchup, prioritize_distance): wgs84_geod = Geod(ellps='WGS84') lat1, lon1 = (primary.latitude, primary.longitude) lat2, lon2 = (matchup.latitude, matchup.longitude) az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2) - return distance, time_dist(primary, matchup) + if prioritize_distance: + return distance, time_dist(primary, matchup) + return time_dist(primary, matchup), distance def time_dist(primary, matchup): primary_time = iso_time_to_epoch(primary.time) @@ -645,7 +658,11 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n rdd_filtered = rdd_filtered.map( lambda primary_matchup: tuple( - [primary_matchup[0], tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])] + [primary_matchup[0], tuple([primary_matchup[1], dist( + primary_matchup[0], + primary_matchup[1], + prioritize_distance + )])] )).combineByKey( lambda value: [value], lambda value_list, value: value_list + [value], diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml index 55802cd..206278e 100644 --- a/analysis/webservice/apidocs/openapi.yml +++ b/analysis/webservice/apidocs/openapi.yml @@ -167,6 +167,19 @@ paths: type: integer default: 500 example: 500 + - in: query + name: prioritizeDistance + description: | + If true, prioritize distance over time when computing matches. + If false, prioritize time over distance. This is only relevant if + matchOnce=true, because otherwise all matches will be included + so long as they fit within the user-provided tolerances. + Default is true. + required: false + schema: + type: boolean + default: true + example: true responses: '200': description: Successful operation
