This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new c2e7e7b SDAP-402: Update matchOnce logic (#202)
c2e7e7b is described below
commit c2e7e7b40afc8ce287ef57fe8a26a6b40679d5fd
Author: Stepheny Perez <[email protected]>
AuthorDate: Wed Oct 12 08:57:38 2022 -0700
SDAP-402: Update matchOnce logic (#202)
* Fixed matchup tests
* test data update to fix matchup test
* Updated matchup matchOnce logic
* Added new test case for matchOnce fix
* removed debugging statements
* removed debugging statements
* Updated changelog
---
CHANGELOG.md | 1 +
analysis/tests/algorithms_spark/test_matchup.py | 83 ++++++++++++++++++++++---
analysis/tests/data/edge_response.json | 23 +++++--
analysis/webservice/algorithms_spark/Matchup.py | 37 ++++++++---
4 files changed, 123 insertions(+), 21 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6b0e897..53da7c8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,6 +24,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- SDAP-390: Changed `/doms` to `/cdms` and `doms_reader.py` to `cdms_reader.py`
- domslist endpoint points to AWS insitu instead of doms insitu
- Matchup returns numSecondary and numPrimary counts rather than insitu/gridded
+- SDAP-402: Changed matchup matchOnce logic to match multiple points if same
time/space
- Bumped ingress timeout in Helm chart to reflect AWS gateway timeout
### Deprecated
### Removed
diff --git a/analysis/tests/algorithms_spark/test_matchup.py
b/analysis/tests/algorithms_spark/test_matchup.py
index 7aee128..eba1e5f 100644
--- a/analysis/tests/algorithms_spark/test_matchup.py
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -27,7 +27,7 @@ from nexustiles.model.nexusmodel import Tile, TileVariable
from pyspark.sql import SparkSession
from shapely import wkt
from shapely.geometry import box
-from webservice.algorithms_spark.Matchup import DomsPoint, Matchup, DataPoint
+from webservice.algorithms_spark.Matchup import DomsPoint, Matchup, DataPoint,
spark_matchup_driver
class MockSparkParam:
@@ -93,6 +93,7 @@ def setup_mock_tile_service(tile):
tile_service.get_min_time.return_value = 1627490285
tile_service.get_max_time.return_value = 1627490285
tile_service.mask_tiles_to_polygon.return_value = [tile]
+ tile_service.find_tiles_in_polygon.return_value = [tile]
return tile_service_factory
@@ -100,15 +101,18 @@ def test_doms_point_is_pickleable():
edge_point = {
'id': 'argo-profiles-5903995(46, 0)',
'time': '2012-10-15T14:24:04Z',
- 'point': '-33.467 29.728',
+ 'longitude': -33.467,
+ 'latitude': 29.728,
'sea_water_temperature': 24.5629997253,
'sea_water_temperature_depth': 2.9796258642,
'wind_speed': None,
'sea_water_salinity': None,
'sea_water_salinity_depth': None,
- 'platform': 4,
'device': 3,
- 'fileurl':
'ftp://podaac-ftp.jpl.nasa.gov/allData/argo-profiles-5903995.nc'
+ 'fileurl':
'ftp://podaac-ftp.jpl.nasa.gov/allData/argo-profiles-5903995.nc',
+ 'platform': {
+ 'code': 4
+ }
}
point = DomsPoint.from_edge_point(edge_point)
assert pickle.dumps(point) is not None
@@ -245,8 +249,8 @@ def test_calc(test_matchup_args):
assert
json_matchup_result['data'][1]['matches'][0]['secondary'][0]['variable_value']
== 30.0
assert
json_matchup_result['data'][1]['matches'][1]['secondary'][0]['variable_value']
== 40.0
- assert json_matchup_result['details']['numInSituMatched'] == 4
- assert json_matchup_result['details']['numGriddedMatched'] == 2
+ assert json_matchup_result['details']['numSecondaryMatched'] == 4
+ assert json_matchup_result['details']['numPrimaryMatched'] == 2
def test_match_satellite_to_insitu(test_dir, test_tile, test_matchup_args):
@@ -302,13 +306,13 @@ def test_match_satellite_to_insitu(test_dir, test_tile,
test_matchup_args):
platforms = '1,2,3,4,5,6,7,8,9'
with mock.patch(
-
'webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName'
+
'webservice.algorithms_spark.Matchup.edge_endpoints.get_provider_name'
) as mock_edge_endpoints:
# Test the satellite->insitu branch
# By mocking the getEndpointsByName function we are forcing
# Matchup to think this dummy matchup dataset is an insitu
# dataset
- mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+ mock_edge_endpoints.return_value = 'some-provider'
matchup.query_edge = lambda *args, **kwargs: json.load(
open(os.path.join(test_dir, 'edge_response.json')))
@@ -426,13 +430,13 @@ def test_multi_variable_matchup(test_dir, test_tile,
test_matchup_args):
test_matchup_args['tile_service_factory'] =
setup_mock_tile_service(test_tile)
with mock.patch(
-
'webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName'
+
'webservice.algorithms_spark.Matchup.edge_endpoints.get_provider_name'
) as mock_edge_endpoints:
# Test the satellite->insitu branch
# By mocking the getEndpointsByName function we are forcing
# Matchup to think this dummy matchup dataset is an insitu
# dataset
- mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+ mock_edge_endpoints.return_value = 'some-provider'
matchup.query_edge = lambda *args, **kwargs: json.load(
open(os.path.join(test_dir, 'edge_response.json')))
@@ -526,3 +530,62 @@ def
test_multi_variable_satellite_to_satellite_matchup(test_dir, test_tile, test
assert len(matchup_result[0][1].data) == 2
assert len(matchup_result[1][0].data) == 2
assert len(matchup_result[1][1].data) == 2
+
+
+def test_match_once_keep_duplicates():
+ """
+ Ensure duplicate points (in space and time) are maintained when
+ matchup is called with matchOnce=True. Multiple points with the
+ same space/time should be kept even if they have different
+ depth/devices
+ """
+ primary_doms_point = DomsPoint(longitude=1.0, latitude=1.0,
time='2017-07-01T00:00:00Z', depth=None, data_id = 'primary')
+ secondary_doms_point_1 = DomsPoint(longitude=2.0, latitude=2.0,
time='2017-07-02T00:00:00Z', depth=-2, data_id = 'secondary1')
+ secondary_doms_point_2 = DomsPoint(longitude=2.0, latitude=2.0,
time='2017-07-02T00:00:00Z', depth=-3, data_id = 'secondary2')
+ secondary_doms_point_3 = DomsPoint(longitude=100.0, latitude=50.0,
time='2017-07-05T00:00:00Z', depth=0, data_id = 'secondary3')
+
+ primary_doms_point.data = []
+ secondary_doms_point_1.data = []
+ secondary_doms_point_2.data = []
+ secondary_doms_point_3.data = []
+
+ patch_generators = [
+ (primary_doms_point, secondary_doms_point_3),
+ (primary_doms_point, secondary_doms_point_2),
+ (primary_doms_point, secondary_doms_point_1)
+ ]
+
+ spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+ spark_context = spark.sparkContext
+
+ with mock.patch(
+ 'webservice.algorithms_spark.Matchup.match_satellite_to_insitu',
+ ) as mock_match_satellite_to_insitu, mock.patch(
+ 'webservice.algorithms_spark.Matchup.determine_parallelism'
+ ) as mock_determine_parallelism:
+ # Mock the actual call to generate a matchup. Hardcode response
+ # to test this scenario
+ mock_match_satellite_to_insitu.return_value = patch_generators
+ mock_determine_parallelism.return_value = 1
+
+ match_result = spark_matchup_driver(
+ tile_ids=['test'],
+ bounding_wkt='',
+ primary_ds_name='',
+ secondary_ds_names='',
+ parameter='',
+ depth_min=0,
+ depth_max=0,
+ time_tolerance=2000000,
+ radius_tolerance=0,
+ platforms='',
+ match_once=True,
+ tile_service_factory=lambda x: None,
+ sc=spark_context
+ )
+ assert len(match_result) == 1
+ secondary_points = match_result[list(match_result.keys())[0]]
+ assert len(secondary_points) == 2
+ for point in secondary_points:
+ assert point.data_id in [secondary_doms_point_1.data_id,
secondary_doms_point_2.data_id]
+ assert point.data_id != secondary_doms_point_3.data_id
diff --git a/analysis/tests/data/edge_response.json
b/analysis/tests/data/edge_response.json
index 99be82d..17ee094 100644
--- a/analysis/tests/data/edge_response.json
+++ b/analysis/tests/data/edge_response.json
@@ -1,23 +1,38 @@
{
- "totalResults": 3,
+ "total": 3,
"results": [
{
"point": "Point(5.0 15.0)",
+ "longitude": 5.0,
+ "latitude": 15.0,
"time": 1595954285,
"sea_water_temperature": 10.0,
- "id": 1234
+ "id": 1234,
+ "platform": {
+ "code": "30"
+ }
},
{
"point": "Point(10.0 10.0)",
+ "longitude": 10.0,
+ "latitude": 10.0,
"time": 1595954286,
"sea_water_temperature": 20.0,
- "id": 1235
+ "id": 1235,
+ "platform": {
+ "code": "30"
+ }
},
{
"point": "Point(18.0 3.0)",
+ "longitude": 18.0,
+ "latitude": 3.0,
"time": 1595954287,
"sea_water_temperature": 30.0,
- "id": 1236
+ "id": 1236,
+ "platform": {
+ "code": "30"
+ }
}
]
}
diff --git a/analysis/webservice/algorithms_spark/Matchup.py
b/analysis/webservice/algorithms_spark/Matchup.py
index 7711691..5c5ec74 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -363,7 +363,6 @@ class DomsPoint(object):
self.data = None
self.source = None
- self.depth = None
self.platform = None
self.device = None
self.file_url = None
@@ -572,7 +571,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt,
primary_ds_name, secondary_ds_n
parameter_b = sc.broadcast(parameter)
# Parallelize list of tile ids
- rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+ rdd = sc.parallelize(tile_ids, determine_parallelism(len(tile_ids)))
# Map Partitions ( list(tile_id) )
rdd_filtered = rdd.mapPartitions(
@@ -615,10 +614,34 @@ def spark_matchup_driver(tile_ids, bounding_wkt,
primary_ds_name, secondary_ds_n
matchup_time = iso_time_to_epoch(matchup.time)
return abs(primary_time - matchup_time)
- rdd_filtered = rdd_filtered \
- .map(lambda primary_matchup: tuple([primary_matchup[0],
tuple([primary_matchup[1], dist(primary_matchup[0], primary_matchup[1])])])) \
- .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] <
match_2[1] else match_2) \
- .mapValues(lambda x: [x[0]])
+ def filter_closest(matches):
+ """
+ Filter given matches. Find the closest match to the primary
+ point and only keep other matches that match the same
+ time/space as that point.
+
+ :param matches: List of match tuples. Each tuple has the following
format:
+ 1. The secondary match
+ 2. Tuple of form (space_dist, time_dist)
+ """
+ closest_point = min(matches, key=lambda match: match[1])[0]
+ matches = list(filter(
+ lambda match: match.latitude == closest_point.latitude and
+ match.longitude == closest_point.longitude and
+ match.time == closest_point.time, map(
+ lambda match: match[0], matches
+ )
+ ))
+ return matches
+
+ rdd_filtered = rdd_filtered.map(
+ lambda primary_matchup: tuple(
+ [primary_matchup[0], tuple([primary_matchup[1],
dist(primary_matchup[0], primary_matchup[1])])]
+ )).combineByKey(
+ lambda value: [value],
+ lambda value_list, value: value_list + [value],
+ lambda value_list_a, value_list_b: value_list_a + value_list_b
+ ).mapValues(lambda matches: filter_closest(matches))
else:
rdd_filtered = rdd_filtered \
.combineByKey(lambda value: [value], # Create 1 element list
@@ -630,7 +653,7 @@ def spark_matchup_driver(tile_ids, bounding_wkt,
primary_ds_name, secondary_ds_n
return result_as_map
-def determine_parllelism(num_tiles):
+def determine_parallelism(num_tiles):
"""
Try to stay at a maximum of 140 tiles per partition; But don't go over 128
partitions.
Also, don't go below the default of 8