This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new c2e7e7b  SDAP-402: Update matchOnce logic (#202)
c2e7e7b is described below

commit c2e7e7b40afc8ce287ef57fe8a26a6b40679d5fd
Author: Stepheny Perez <[email protected]>
AuthorDate: Wed Oct 12 08:57:38 2022 -0700

    SDAP-402: Update matchOnce logic (#202)
    
    * Fixed matchup tests
    
    * test data update to fix matchup test
    
    * Updated matchup matchOnce logic
    
    * Added new test case for matchOnce fix
    
    * removed debugging statements
    
    * removed debugging statements
    
    * Updated changelog
---
 CHANGELOG.md                                    |  1 +
 analysis/tests/algorithms_spark/test_matchup.py | 83 ++++++++++++++++++++++---
 analysis/tests/data/edge_response.json          | 23 +++++--
 analysis/webservice/algorithms_spark/Matchup.py | 37 ++++++++---
 4 files changed, 123 insertions(+), 21 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6b0e897..53da7c8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,6 +24,7 @@ and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0
 - SDAP-390: Changed `/doms` to `/cdms` and `doms_reader.py` to `cdms_reader.py`
 - domslist endpoint points to AWS insitu instead of doms insitu
 - Matchup returns numSecondary and numPrimary counts rather than insitu/gridded
+- SDAP-402: Changed matchup matchOnce logic to match multiple points if same 
time/space
 - Bumped ingress timeout in Helm chart to reflect AWS gateway timeout
 ### Deprecated
 ### Removed
diff --git a/analysis/tests/algorithms_spark/test_matchup.py 
b/analysis/tests/algorithms_spark/test_matchup.py
index 7aee128..eba1e5f 100644
--- a/analysis/tests/algorithms_spark/test_matchup.py
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -27,7 +27,7 @@ from nexustiles.model.nexusmodel import Tile, TileVariable
 from pyspark.sql import SparkSession
 from shapely import wkt
 from shapely.geometry import box
-from webservice.algorithms_spark.Matchup import DomsPoint, Matchup, DataPoint
+from webservice.algorithms_spark.Matchup import DomsPoint, Matchup, DataPoint, 
spark_matchup_driver
 
 
 class MockSparkParam:
@@ -93,6 +93,7 @@ def setup_mock_tile_service(tile):
     tile_service.get_min_time.return_value = 1627490285
     tile_service.get_max_time.return_value = 1627490285
     tile_service.mask_tiles_to_polygon.return_value = [tile]
+    tile_service.find_tiles_in_polygon.return_value = [tile]
     return tile_service_factory
 
 
@@ -100,15 +101,18 @@ def test_doms_point_is_pickleable():
     edge_point = {
         'id': 'argo-profiles-5903995(46, 0)',
         'time': '2012-10-15T14:24:04Z',
-        'point': '-33.467 29.728',
+        'longitude': -33.467,
+        'latitude': 29.728,
         'sea_water_temperature': 24.5629997253,
         'sea_water_temperature_depth': 2.9796258642,
         'wind_speed': None,
         'sea_water_salinity': None,
         'sea_water_salinity_depth': None,
-        'platform': 4,
         'device': 3,
-        'fileurl': 
'ftp://podaac-ftp.jpl.nasa.gov/allData/argo-profiles-5903995.nc'
+        'fileurl': 
'ftp://podaac-ftp.jpl.nasa.gov/allData/argo-profiles-5903995.nc',
+        'platform': {
+            'code': 4
+        }
     }
     point = DomsPoint.from_edge_point(edge_point)
     assert pickle.dumps(point) is not None
@@ -245,8 +249,8 @@ def test_calc(test_matchup_args):
         assert 
json_matchup_result['data'][1]['matches'][0]['secondary'][0]['variable_value'] 
== 30.0
         assert 
json_matchup_result['data'][1]['matches'][1]['secondary'][0]['variable_value'] 
== 40.0
 
-        assert json_matchup_result['details']['numInSituMatched'] == 4
-        assert json_matchup_result['details']['numGriddedMatched'] == 2
+        assert json_matchup_result['details']['numSecondaryMatched'] == 4
+        assert json_matchup_result['details']['numPrimaryMatched'] == 2
 
 
 def test_match_satellite_to_insitu(test_dir, test_tile, test_matchup_args):
@@ -302,13 +306,13 @@ def test_match_satellite_to_insitu(test_dir, test_tile, 
test_matchup_args):
     platforms = '1,2,3,4,5,6,7,8,9'
 
     with mock.patch(
-            
'webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName'
+            
'webservice.algorithms_spark.Matchup.edge_endpoints.get_provider_name'
     ) as mock_edge_endpoints:
         # Test the satellite->insitu branch
         # By mocking the getEndpointsByName function we are forcing
         # Matchup to think this dummy matchup dataset is an insitu
         # dataset
-        mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+        mock_edge_endpoints.return_value = 'some-provider'
         matchup.query_edge = lambda *args, **kwargs: json.load(
             open(os.path.join(test_dir, 'edge_response.json')))
 
@@ -426,13 +430,13 @@ def test_multi_variable_matchup(test_dir, test_tile, 
test_matchup_args):
     test_matchup_args['tile_service_factory'] = 
setup_mock_tile_service(test_tile)
 
     with mock.patch(
-            
'webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName'
+            
'webservice.algorithms_spark.Matchup.edge_endpoints.get_provider_name'
     ) as mock_edge_endpoints:
         # Test the satellite->insitu branch
         # By mocking the getEndpointsByName function we are forcing
         # Matchup to think this dummy matchup dataset is an insitu
         # dataset
-        mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+        mock_edge_endpoints.return_value = 'some-provider'
         matchup.query_edge = lambda *args, **kwargs: json.load(
             open(os.path.join(test_dir, 'edge_response.json')))
 
@@ -526,3 +530,62 @@ def 
test_multi_variable_satellite_to_satellite_matchup(test_dir, test_tile, test
         assert len(matchup_result[0][1].data) == 2
         assert len(matchup_result[1][0].data) == 2
         assert len(matchup_result[1][1].data) == 2
+
+
+def test_match_once_keep_duplicates():
+    """
+    Ensure duplicate points (in space and time) are maintained when
+    matchup is called with matchOnce=True. Multiple points with the
+    same space/time should be kept even if they have different
+    depth/devices
+    """
+    primary_doms_point = DomsPoint(longitude=1.0, latitude=1.0, 
time='2017-07-01T00:00:00Z', depth=None, data_id = 'primary')
+    secondary_doms_point_1 = DomsPoint(longitude=2.0, latitude=2.0, 
time='2017-07-02T00:00:00Z', depth=-2, data_id = 'secondary1')
+    secondary_doms_point_2 = DomsPoint(longitude=2.0, latitude=2.0, 
time='2017-07-02T00:00:00Z', depth=-3, data_id = 'secondary2')
+    secondary_doms_point_3 = DomsPoint(longitude=100.0, latitude=50.0, 
time='2017-07-05T00:00:00Z', depth=0, data_id = 'secondary3')
+
+    primary_doms_point.data = []
+    secondary_doms_point_1.data = []
+    secondary_doms_point_2.data = []
+    secondary_doms_point_3.data = []
+
+    patch_generators = [
+        (primary_doms_point, secondary_doms_point_3),
+        (primary_doms_point, secondary_doms_point_2),
+        (primary_doms_point, secondary_doms_point_1)
+    ]
+
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+
+    with mock.patch(
+        'webservice.algorithms_spark.Matchup.match_satellite_to_insitu',
+    ) as mock_match_satellite_to_insitu, mock.patch(
+        'webservice.algorithms_spark.Matchup.determine_parallelism'
+    ) as mock_determine_parallelism:
+        # Mock the actual call to generate a matchup. Hardcode response
+        # to test this scenario
+        mock_match_satellite_to_insitu.return_value = patch_generators
+        mock_determine_parallelism.return_value = 1
+
+        match_result = spark_matchup_driver(
+            tile_ids=['test'],
+            bounding_wkt='',
+            primary_ds_name='',
+            secondary_ds_names='',
+            parameter='',
+            depth_min=0,
+            depth_max=0,
+            time_tolerance=2000000,
+            radius_tolerance=0,
+            platforms='',
+            match_once=True,
+            tile_service_factory=lambda x: None,
+            sc=spark_context
+        )
+        assert len(match_result) == 1
+        secondary_points = match_result[list(match_result.keys())[0]]
+        assert len(secondary_points) == 2
+        for point in secondary_points:
+            assert point.data_id in [secondary_doms_point_1.data_id, 
secondary_doms_point_2.data_id]
+            assert point.data_id != secondary_doms_point_3.data_id
diff --git a/analysis/tests/data/edge_response.json 
b/analysis/tests/data/edge_response.json
index 99be82d..17ee094 100644
--- a/analysis/tests/data/edge_response.json
+++ b/analysis/tests/data/edge_response.json
@@ -1,23 +1,38 @@
 {
-  "totalResults": 3,
+  "total": 3,
   "results": [
     {
       "point": "Point(5.0 15.0)",
+      "longitude": 5.0,
+      "latitude": 15.0,
       "time": 1595954285,
       "sea_water_temperature": 10.0,
-      "id": 1234
+      "id": 1234,
+      "platform": {
+          "code": "30"
+      }
     },
     {
       "point": "Point(10.0 10.0)",
+      "longitude": 10.0,
+      "latitude": 10.0,
       "time": 1595954286,
       "sea_water_temperature": 20.0,
-      "id": 1235
+      "id": 1235,
+      "platform": {
+          "code": "30"
+      }
     },
     {
       "point": "Point(18.0 3.0)",
+      "longitude": 18.0,
+      "latitude": 3.0,
       "time": 1595954287,
       "sea_water_temperature": 30.0,
-      "id": 1236
+      "id": 1236,
+      "platform": {
+          "code": "30"
+      }
     }
   ]
 }
diff --git a/analysis/webservice/algorithms_spark/Matchup.py 
b/analysis/webservice/algorithms_spark/Matchup.py
index 7711691..5c5ec74 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -363,7 +363,6 @@ class DomsPoint(object):
         self.data = None
 
         self.source = None
-        self.depth = None
         self.platform = None
         self.device = None
         self.file_url = None
@@ -572,7 +571,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, 
primary_ds_name, secondary_ds_n
         parameter_b = sc.broadcast(parameter)
 
         # Parallelize list of tile ids
-        rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+        rdd = sc.parallelize(tile_ids, determine_parallelism(len(tile_ids)))
 
     # Map Partitions ( list(tile_id) )
     rdd_filtered = rdd.mapPartitions(
@@ -615,10 +614,34 @@ def spark_matchup_driver(tile_ids, bounding_wkt, 
primary_ds_name, secondary_ds_n
             matchup_time = iso_time_to_epoch(matchup.time)
             return abs(primary_time - matchup_time)
 
-        rdd_filtered = rdd_filtered \
-            .map(lambda primary_matchup: tuple([primary_matchup[0], 
tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])])) \
-            .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < 
match_2[1] else match_2) \
-            .mapValues(lambda x: [x[0]])
+        def filter_closest(matches):
+            """
+            Filter given matches. Find the closest match to the primary
+            point and only keep other matches that match the same
+            time/space as that point.
+
+            :param matches: List of match tuples. Each tuple has the following 
format:
+                1. The secondary match
+                2. Tuple of form (space_dist, time_dist)
+            """
+            closest_point = min(matches, key=lambda match: match[1])[0]
+            matches = list(filter(
+                lambda match: match.latitude == closest_point.latitude and
+                              match.longitude == closest_point.longitude and
+                              match.time == closest_point.time, map(
+                    lambda match: match[0], matches
+                )
+            ))
+            return matches
+
+        rdd_filtered = rdd_filtered.map(
+            lambda primary_matchup: tuple(
+                [primary_matchup[0], tuple([primary_matchup[1], 
dist(primary_matchup[0], primary_matchup[1])])]
+            )).combineByKey(
+                lambda value: [value],
+                lambda value_list, value: value_list + [value],
+                lambda value_list_a, value_list_b: value_list_a + value_list_b
+            ).mapValues(lambda matches: filter_closest(matches))
     else:
         rdd_filtered = rdd_filtered \
             .combineByKey(lambda value: [value],  # Create 1 element list
@@ -630,7 +653,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt, 
primary_ds_name, secondary_ds_n
     return result_as_map
 
 
-def determine_parllelism(num_tiles):
+def determine_parallelism(num_tiles):
     """
     Try to stay at a maximum of 140 tiles per partition; But don't go over 128 
partitions.
     Also, don't go below the default of 8

Reply via email to