This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new c07a689 SDAP-315: Updated matchup algorithm to support satellite to
satellite (#131)
c07a689 is described below
commit c07a689edbb4ef529c3c747820b2b8bd19d83a51
Author: Stepheny Perez <[email protected]>
AuthorDate: Wed Aug 4 17:21:12 2021 -0700
SDAP-315: Updated matchup algorithm to support satellite to satellite (#131)
* SDAP-315: Updated matchup algorithm to support satellite to satellite
* SDAP-315: Updated unit tests to a passing state. Moved integration tests
to a new test directory
---
analysis/tests/README.md | 23 ++
analysis/tests/algorithms_spark/Matchup_test.py | 321 ------------------
analysis/tests/algorithms_spark/test_matchup.py | 357 +++++++++++++++++++++
analysis/tests/conftest.py | 34 ++
analysis/tests/data/edge_response.json | 24 ++
.../integration/algorithms_spark/test_matchup.py | 111 +++++++
.../webservice/algorithms/doms/BaseDomsHandler.py | 2 +
analysis/webservice/algorithms_spark/Matchup.py | 145 +++++++--
8 files changed, 662 insertions(+), 355 deletions(-)
diff --git a/analysis/tests/README.md b/analysis/tests/README.md
new file mode 100644
index 0000000..a0f5cb9
--- /dev/null
+++ b/analysis/tests/README.md
@@ -0,0 +1,23 @@
+# Apache SDAP Testing
+
+## Unit Tests
+
+Unit tests don't contain any externally running dependencies (like Solr
+or Cassandra). Unit tests, unlike integration tests, do not contain an
+'integration' pytest marker. To run all unit tests and skip integration
+tests, run the following command:
+
+```shell script
+pytest -m "not integration" analysis/tests/
+```
+
+## Integration Tests
+
+To run integration tests, Cassandra and Solr must be running locally.
+
+Integration tests have been marked with 'integration' pytest markers.
+In order to run only integration tests, run the following command:
+
+```shell script
+pytest -m "integration" analysis/tests/
+```
diff --git a/analysis/tests/algorithms_spark/Matchup_test.py
b/analysis/tests/algorithms_spark/Matchup_test.py
deleted file mode 100644
index bb39e70..0000000
--- a/analysis/tests/algorithms_spark/Matchup_test.py
+++ /dev/null
@@ -1,321 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import pickle
-import random
-import timeit
-import unittest
-
-from webservice.algorithms_spark.Matchup import *
-
-
-class TestMatch_Points(unittest.TestCase):
- def test_one_point_match_exact(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0,
data_id=1)
- matchup = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0,
data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points,
0))
-
- self.assertEqual(1, len(matches))
-
- p_match_point, match = matches[0]
-
- self.assertEqual(primary, p_match_point)
- self.assertEqual(matchup, match)
-
- def test_one_point_match_within_tolerance_150km(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0,
data_id=1)
- matchup = DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=5.0,
data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points,
150000)) # tolerance 150 km
-
- self.assertEqual(1, len(matches))
-
- p_match_point, match = matches[0]
-
- self.assertEqual(primary, p_match_point)
- self.assertEqual(matchup, match)
-
- def test_one_point_match_within_tolerance_200m(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0,
data_id=1)
- matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000,
depth=5.0, data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points,
200)) # tolerance 200 m
-
- self.assertEqual(1, len(matches))
-
- p_match_point, match = matches[0]
-
- self.assertEqual(primary, p_match_point)
- self.assertEqual(matchup, match)
-
- def test_one_point_not_match_tolerance_150km(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0,
data_id=1)
- matchup = DomsPoint(longitude=1.0, latitude=4.0, time=1000, depth=5.0,
data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points,
150000)) # tolerance 150 km
-
- self.assertEqual(0, len(matches))
-
- def test_one_point_not_match_tolerance_100m(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0,
data_id=1)
- matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000,
depth=5.0, data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points,
100)) # tolerance 100 m
-
- self.assertEqual(0, len(matches))
-
- def test_multiple_point_match(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0,
data_id=1)
- primary_points = [primary]
-
- matchup_points = [
- DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0,
data_id=2),
- DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0,
data_id=3),
- DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0,
data_id=4)
- ]
-
- matches = list(match_points_generator(primary_points, matchup_points,
150000)) # tolerance 150 km
-
- self.assertEqual(3, len(matches))
-
- self.assertSetEqual({primary}, {x[0] for x in matches})
-
- list_of_matches = [x[1] for x in matches]
-
- self.assertEqual(3, len(list_of_matches))
- self.assertItemsEqual(matchup_points, list_of_matches)
-
- def test_multiple_point_match_multiple_times(self):
- primary_points = [
- DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0,
data_id=1),
- DomsPoint(longitude=1.5, latitude=1.5, time=1000, depth=5.0,
data_id=2)
- ]
-
- matchup_points = [
- DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0,
data_id=3),
- DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0,
data_id=4),
- DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0,
data_id=5)
- ]
-
- matches = list(match_points_generator(primary_points, matchup_points,
150000)) # tolerance 150 km
-
- self.assertEqual(5, len(matches))
-
- self.assertSetEqual({p for p in primary_points}, {x[0] for x in
matches})
-
- # First primary point matches all 3 secondary
- self.assertEqual(3, [x[0] for x in matches].count(primary_points[0]))
- self.assertItemsEqual(matchup_points, [x[1] for x in matches if x[0]
== primary_points[0]])
-
- # Second primary point matches only last 2 secondary
- self.assertEqual(2, [x[0] for x in matches].count(primary_points[1]))
- self.assertItemsEqual(matchup_points[1:], [x[1] for x in matches if
x[0] == primary_points[1]])
-
- def test_one_of_many_primary_matches_one_of_many_matchup(self):
- primary_points = [
- DomsPoint(longitude=-33.76764, latitude=30.42946, time=1351553994,
data_id=1),
- DomsPoint(longitude=-33.75731, latitude=29.86216, time=1351554004,
data_id=2)
- ]
-
- matchup_points = [
- DomsPoint(longitude=-33.762, latitude=28.877, time=1351521432,
depth=3.973, data_id=3),
- DomsPoint(longitude=-34.916, latitude=28.879, time=1351521770,
depth=2.9798, data_id=4),
- DomsPoint(longitude=-31.121, latitude=31.256, time=1351519892,
depth=4.07, data_id=5)
- ]
-
- matches = list(match_points_generator(primary_points, matchup_points,
110000)) # tolerance 110 km
-
- self.assertEqual(1, len(matches))
-
- self.assertSetEqual({p for p in primary_points if p.data_id == 2},
{x[0] for x in matches})
-
- # First primary point matches none
- self.assertEqual(0, [x[0] for x in matches].count(primary_points[0]))
-
- # Second primary point matches only first secondary
- self.assertEqual(1, [x[0] for x in matches].count(primary_points[1]))
- self.assertItemsEqual(matchup_points[0:1], [x[1] for x in matches if
x[0] == primary_points[1]])
-
- @unittest.skip("This test is just for timing, doesn't actually assert
anything.")
- def test_time_many_primary_many_matchup(self):
- import logging
- import sys
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s -
%(name)s - %(levelname)s - %(message)s',
- datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
- log = logging.getLogger(__name__)
- # Generate 160000 DomsPoints distributed equally in a box from -2.0
lat/lon to 2.0 lat/lon
- log.info("Generating primary points")
- x = np.arange(-2.0, 2.0, 0.01)
- y = np.arange(-2.0, 2.0, 0.01)
- primary_points = [DomsPoint(longitude=xy[0], latitude=xy[1],
time=1000, depth=5.0, data_id=i) for i, xy in
- enumerate(np.array(np.meshgrid(x, y)).T.reshape(-1,
2))]
-
- # Generate 2000 DomsPoints distributed randomly in a box from -2.0
lat/lon to 2.0 lat/lon
- log.info("Generating matchup points")
- matchup_points = [
- DomsPoint(longitude=random.uniform(-2.0, 2.0),
latitude=random.uniform(-2.0, 2.0), time=1000, depth=5.0,
- data_id=i) for i in range(0, 2000)]
-
- log.info("Starting matchup")
- log.info("Best of repeat(3, 2) matchups: %s seconds" % min(
- timeit.repeat(lambda: list(match_points_generator(primary_points,
matchup_points, 1500)), repeat=3,
- number=2)))
-
-
-class TestDOMSPoint(unittest.TestCase):
- def test_is_pickleable(self):
- edge_point = json.loads("""{
-"id": "argo-profiles-5903995(46, 0)",
-"time": "2012-10-15T14:24:04Z",
-"point": "-33.467 29.728",
-"sea_water_temperature": 24.5629997253,
-"sea_water_temperature_depth": 2.9796258642,
-"wind_speed": null,
-"sea_water_salinity": null,
-"sea_water_salinity_depth": null,
-"platform": 4,
-"device": 3,
-"fileurl":
"ftp://podaac-ftp.jpl.nasa.gov/allData/insitu/L2/spurs1/argo/argo-profiles-5903995.nc"
-}""")
- point = DomsPoint.from_edge_point(edge_point)
- self.assertIsNotNone(pickle.dumps(point))
-
-
-def check_all():
- return check_solr() and check_cass() and check_edge()
-
-
-def check_solr():
- # TODO eventually this might do something.
- return False
-
-
-def check_cass():
- # TODO eventually this might do something.
- return False
-
-
-def check_edge():
- # TODO eventually this might do something.
- return False
-
-
[email protected](check_all(),
- "These tests require local instances of Solr, Cassandra,
and Edge to be running.")
-class TestMatchup(unittest.TestCase):
- def setUp(self):
- from os import environ
- environ['PYSPARK_DRIVER_PYTHON'] =
'/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
- environ['PYSPARK_PYTHON'] =
'/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
- environ['SPARK_HOME'] =
'/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7'
-
- def test_mur_match(self):
- from shapely.wkt import loads
- from nexustiles.nexustiles import NexusTileService
-
- polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00,
-34.98 31.00, -34.98 29.54))")
- primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1"
- matchup_ds = "spurs"
- parameter = "sst"
- start_time = 1350259200 # 2012-10-15T00:00:00Z
- end_time = 1350345600 # 2012-10-16T00:00:00Z
- time_tolerance = 86400
- depth_tolerance = 5.0
- radius_tolerance = 1500.0
- platforms = "1,2,3,4,5,6,7,8,9"
-
- tile_service = NexusTileService()
- tile_ids = [tile.tile_id for tile in
- tile_service.find_tiles_in_polygon(polygon, primary_ds,
start_time, end_time, fetch_data=False,
- fl='id')]
- result = spark_matchup_driver(tile_ids, wkt.dumps(polygon),
primary_ds, matchup_ds, parameter, time_tolerance,
- depth_tolerance, radius_tolerance,
platforms)
- for k, v in result.items():
- print("primary: %s\n\tmatches:\n\t\t%s" % (
- "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude,
k.latitude, k.time, k.sst),
- '\n\t\t'.join(
- ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude,
i.latitude, i.time, i.sst) for i in v])))
-
- def test_smap_match(self):
- from shapely.wkt import loads
- from nexustiles.nexustiles import NexusTileService
-
- polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00,
-34.98 31.00, -34.98 29.54))")
- primary_ds = "SMAP_L2B_SSS"
- matchup_ds = "spurs"
- parameter = "sss"
- start_time = 1350259200 # 2012-10-15T00:00:00Z
- end_time = 1350345600 # 2012-10-16T00:00:00Z
- time_tolerance = 86400
- depth_tolerance = 5.0
- radius_tolerance = 1500.0
- platforms = "1,2,3,4,5,6,7,8,9"
-
- tile_service = NexusTileService()
- tile_ids = [tile.tile_id for tile in
- tile_service.find_tiles_in_polygon(polygon, primary_ds,
start_time, end_time, fetch_data=False,
- fl='id')]
- result = spark_matchup_driver(tile_ids, wkt.dumps(polygon),
primary_ds, matchup_ds, parameter, time_tolerance,
- depth_tolerance, radius_tolerance,
platforms)
- for k, v in result.items():
- print("primary: %s\n\tmatches:\n\t\t%s" % (
- "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude,
k.latitude, k.time, k.sst),
- '\n\t\t'.join(
- ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude,
i.latitude, i.time, i.sst) for i in v])))
-
- def test_ascatb_match(self):
- from shapely.wkt import loads
- from nexustiles.nexustiles import NexusTileService
-
- polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00,
-34.98 31.00, -34.98 29.54))")
- primary_ds = "ASCATB-L2-Coastal"
- matchup_ds = "spurs"
- parameter = "wind"
- start_time = 1351468800 # 2012-10-29T00:00:00Z
- end_time = 1351555200 # 2012-10-30T00:00:00Z
- time_tolerance = 86400
- depth_tolerance = 5.0
- radius_tolerance = 110000.0 # 110 km
- platforms = "1,2,3,4,5,6,7,8,9"
-
- tile_service = NexusTileService()
- tile_ids = [tile.tile_id for tile in
- tile_service.find_tiles_in_polygon(polygon, primary_ds,
start_time, end_time, fetch_data=False,
- fl='id')]
- result = spark_matchup_driver(tile_ids, wkt.dumps(polygon),
primary_ds, matchup_ds, parameter, time_tolerance,
- depth_tolerance, radius_tolerance,
platforms)
- for k, v in result.items():
- print("primary: %s\n\tmatches:\n\t\t%s" % (
- "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude,
k.latitude, k.time, k.wind_u, k.wind_v),
- '\n\t\t'.join(
- ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (
- i.longitude, i.latitude, i.time, i.wind_u, i.wind_v)
for i in v])))
diff --git a/analysis/tests/algorithms_spark/test_matchup.py
b/analysis/tests/algorithms_spark/test_matchup.py
new file mode 100644
index 0000000..1fcb5ce
--- /dev/null
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -0,0 +1,357 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import json
+import os
+import pickle
+from datetime import datetime, timezone
+
+import mock
+import numpy as np
+import pytest
+import webservice.algorithms_spark.Matchup as matchup
+from nexustiles.model.nexusmodel import Tile
+from pyspark.sql import SparkSession
+from shapely import wkt
+from shapely.geometry import box
+from webservice.algorithms_spark.Matchup import DomsPoint, Matchup
+
+
[email protected](scope='function')
+def test_dir():
+ test_dir = os.path.dirname(os.path.realpath(__file__))
+ test_data_dir = os.path.join(test_dir, '..', 'data')
+ yield test_data_dir
+
+
+def test_doms_point_is_pickleable():
+ edge_point = {
+ 'id': 'argo-profiles-5903995(46, 0)',
+ 'time': '2012-10-15T14:24:04Z',
+ 'point': '-33.467 29.728',
+ 'sea_water_temperature': 24.5629997253,
+ 'sea_water_temperature_depth': 2.9796258642,
+ 'wind_speed': None,
+ 'sea_water_salinity': None,
+ 'sea_water_salinity_depth': None,
+ 'platform': 4,
+ 'device': 3,
+ 'fileurl':
'ftp://podaac-ftp.jpl.nasa.gov/allData/argo-profiles-5903995.nc'
+ }
+ point = DomsPoint.from_edge_point(edge_point)
+ assert pickle.dumps(point) is not None
+
+
+def test_calc():
+ """
+ Assert that the expected functions are called during the matchup
+ calculation and that the results are formatted as expected.
+ """
+ # Mock anything that connects external dependence (Solr, Cassandra, ...)
+ tile_service_factory = mock.MagicMock()
+ tile_service = mock.MagicMock()
+ tile_service_factory.return_value = tile_service
+ spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+ spark_context = spark.sparkContext
+ request = mock.MagicMock()
+ request.get_argument.return_value = '1,2,3,4'
+
+ # Patch in request arguments
+ start_time = datetime.strptime('2020-01-01T00:00:00',
'%Y-%m-%dT%H:%M:%S').replace(
+ tzinfo=timezone.utc)
+ end_time = datetime.strptime('2020-02-01T00:00:00',
'%Y-%m-%dT%H:%M:%S').replace(
+ tzinfo=timezone.utc)
+ polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98
31.00, -34.98 29.54))'
+ args = {
+ 'bounding_polygon': wkt.loads(polygon_wkt),
+ 'primary_ds_name': 'primary-ds-name',
+ 'matchup_ds_names': 'matchup-ds-name',
+ 'parameter_s': 'sst',
+ 'start_time': start_time,
+ 'start_seconds_from_epoch': start_time.timestamp(),
+ 'end_time': end_time,
+ 'end_seconds_from_epoch': end_time.timestamp(),
+ 'depth_min': 1.0,
+ 'depth_max': 2.0,
+ 'time_tolerance': 3.0,
+ 'radius_tolerance': 4.0,
+ 'platforms': '1,2,3,4,5,6,7,8,9',
+ 'match_once': True,
+ 'result_size_limit': 10
+ }
+
+ def generate_fake_tile(tile_id):
+ tile = Tile()
+ tile.tile_id = tile_id
+ return tile
+
+ # Mock tiles
+ fake_tiles = [generate_fake_tile(idx) for idx in range(10)]
+ tile_service.find_tiles_in_polygon.return_value = fake_tiles
+
+ # Mock result
+ # Format of 'spark_result': keys=domspoint,values=list of domspoint
+
+ doms_point_args = {
+ 'longitude': -180,
+ 'latitude': -90,
+ 'time': '2020-01-15T00:00:00Z'
+ }
+ d1_sat = DomsPoint(**doms_point_args)
+ d2_sat = DomsPoint(**doms_point_args)
+ d1_ins = DomsPoint(**doms_point_args)
+ d2_ins = DomsPoint(**doms_point_args)
+
+ d1_sat.satellite_var_name = 'sea_surface_temperature'
+ d2_sat.satellite_var_name = 'sea_surface_temperature'
+ d1_ins.satellite_var_name = 'sea_surface_temperature'
+ d2_ins.satellite_var_name = 'sea_surface_temperature'
+
+ d1_sat.satellite_var_value = 10.0
+ d2_sat.satellite_var_value = 20.0
+ d1_ins.satellite_var_value = 30.0
+ d2_ins.satellite_var_value = 40.0
+
+ fake_spark_result = {
+ d1_sat: [d1_ins, d2_ins],
+ d2_sat: [d1_ins, d2_ins],
+ }
+
+ matchup_obj = Matchup(tile_service_factory=tile_service_factory,
sc=spark_context)
+ matchup_obj.parse_arguments = lambda _: [item for item in args.values()]
+
+ with mock.patch('webservice.algorithms_spark.Matchup.ResultsStorage') as
mock_rs, \
+ mock.patch(
+ 'webservice.algorithms_spark.Matchup.spark_matchup_driver') as
mock_matchup_driver:
+ mock_rs.insertExecution.return_value = 1
+ mock_matchup_driver.return_value = fake_spark_result
+ matchup_result = matchup_obj.calc(request)
+
+ # Ensure the call to 'spark_matchup_driver' contains the expected
params
+ assert len(mock_matchup_driver.call_args_list) == 1
+ matchup_driver_args = mock_matchup_driver.call_args_list[0].args
+ matchup_driver_kwargs = mock_matchup_driver.call_args_list[0].kwargs
+ assert matchup_driver_args[0] == [tile.tile_id for tile in fake_tiles]
+ assert wkt.loads(matchup_driver_args[1]).equals(wkt.loads(polygon_wkt))
+ assert matchup_driver_args[2] == args['primary_ds_name']
+ assert matchup_driver_args[3] == args['matchup_ds_names']
+ assert matchup_driver_args[4] == args['parameter_s']
+ assert matchup_driver_args[5] == args['depth_min']
+ assert matchup_driver_args[6] == args['depth_max']
+ assert matchup_driver_args[7] == args['time_tolerance']
+ assert matchup_driver_args[8] == args['radius_tolerance']
+ assert matchup_driver_args[9] == args['platforms']
+ assert matchup_driver_args[10] == args['match_once']
+ assert matchup_driver_args[11] == tile_service_factory
+ assert matchup_driver_kwargs['sc'] == spark_context
+
+ # Ensure the result of the matchup calculation is as expected
+
+ json_matchup_result = json.loads(matchup_result.toJson())
+ assert len(json_matchup_result['data']) == 2
+ assert len(json_matchup_result['data'][0]['matches']) == 2
+ assert len(json_matchup_result['data'][1]['matches']) == 2
+
+ for data in json_matchup_result['data']:
+ assert data['x'] == '-180'
+ assert data['y'] == '-90'
+ for matches in data['matches']:
+ assert matches['x'] == '-180'
+ assert matches['y'] == '-90'
+
+ assert json_matchup_result['data'][0]['sea_surface_temperature'] ==
10.0
+ assert json_matchup_result['data'][1]['sea_surface_temperature'] ==
20.0
+ assert
json_matchup_result['data'][0]['matches'][0]['sea_surface_temperature'] == 30.0
+ assert
json_matchup_result['data'][0]['matches'][1]['sea_surface_temperature'] == 40.0
+ assert
json_matchup_result['data'][1]['matches'][0]['sea_surface_temperature'] == 30.0
+ assert
json_matchup_result['data'][1]['matches'][1]['sea_surface_temperature'] == 40.0
+
+ assert json_matchup_result['details']['numInSituMatched'] == 4
+ assert json_matchup_result['details']['numGriddedMatched'] == 2
+
+
+def test_match_satellite_to_insitu(test_dir):
+ """
+ Test the test_match_satellite_to_insitu and ensure the matchup is
+ done as expected, where the tile points and in-situ points are all
+ known and the expected matchup points have been hand-calculated.
+
+ This test case mocks out all external dependencies, so Solr,
+ Cassandra, HTTP insitu requests, etc are all mocked.
+
+ The test points are as follows:
+
+ X (0, 20) X (20, 20)
+
+ O (5, 15)
+
+
+
+ O (10, 10)
+
+
+
+
+ O (18, 3)
+
+ X (0, 0) X (20, 0)
+
+ The 'X' points are the primary satellite points and the 'O' points
+ are the secondary satellite or insitu points
+
+ Visual inspection reveals that primary point (0, 20) should match
+ with secondary point (5, 15) and primary point (20, 0) should match
+ with (18, 3)
+ """
+ tile = Tile()
+ tile.tile_id = 1
+ tile.tile_min_lat = 0
+ tile.tile_max_lat = 20
+ tile.tile_min_lon = 0
+ tile.tile_max_lon = 20
+ tile.dataset = 'test-dataset'
+ tile.dataset_id = 123
+ tile.granule = 'test-granule-name'
+ tile.min_time = '2020-07-28T00:00:00'
+ tile.max_time = '2020-07-28T00:00:00'
+ tile.section_spec = 'test-section-spec'
+ tile.var_name = 'sea_surface_temperature'
+ tile.latitudes = np.array([0, 20], dtype=np.float32)
+ tile.longitudes = np.array([0, 20], dtype=np.float32)
+ tile.times = [1627490285]
+ tile.data = np.array([[[11.0, 21.0], [31.0, 41.0]]])
+ tile.get_indices = lambda: [[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 1, 1]]
+ tile.meta_data = {'wind_type': []}
+
+ tile_service_factory = mock.MagicMock()
+ tile_service = mock.MagicMock()
+ tile_service_factory.return_value = tile_service
+ tile_service.get_bounding_box.return_value = box(-90, -45, 90, 45)
+ tile_service.get_min_time.return_value = 1627490285
+ tile_service.get_max_time.return_value = 1627490285
+ tile_service.mask_tiles_to_polygon.return_value = [tile]
+
+ tile_ids = [1]
+ polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98
31.00, -34.98 29.54))'
+ primary_ds_name = 'primary-ds-name'
+ matchup_ds_names = 'test'
+ parameter = 'sst'
+ depth_min = 0.0
+ depth_max = 1.0
+ time_tolerance = 3.0
+ radius_tolerance = 1000000.0
+ platforms = '1,2,3,4,5,6,7,8,9'
+
+ class MockSparkParam:
+ def __init__(self, value):
+ self.value = value
+
+ with
mock.patch('webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName')
as mock_edge_endpoints:
+ # Test the satellite->insitu branch
+ # By mocking the getEndpointsByName function we are forcing
+ # Matchup to think this dummy matchup dataset is an insitu
+ # dataset
+ mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+ matchup.query_edge = lambda *args, **kwargs: json.load(
+ open(os.path.join(test_dir, 'edge_response.json')))
+
+ match_args = dict(
+ tile_ids=tile_ids,
+ primary_b=MockSparkParam(primary_ds_name),
+ matchup_b=MockSparkParam(matchup_ds_names),
+ parameter_b=MockSparkParam(parameter),
+ tt_b=MockSparkParam(time_tolerance),
+ rt_b=MockSparkParam(radius_tolerance),
+ platforms_b=MockSparkParam(platforms),
+ bounding_wkt_b=MockSparkParam(polygon_wkt),
+ depth_min_b=MockSparkParam(depth_min),
+ depth_max_b=MockSparkParam(depth_max),
+ tile_service_factory=tile_service_factory
+ )
+
+ generator = matchup.match_satellite_to_insitu(**match_args)
+
+ def validate_matchup_result(matchup_result, insitu_matchup):
+ """
+ The matchup results for satellite->insitu vs
+ satellite->satellite are almost exactly the same so they
+ can be validated using the same logic. They are the same
+ because they represent the same data, except one test is in
+ tile format (sat to sat) and one is in edge point format
+ (insitu). The only difference is the data field is different
+ for satellite data.
+ """
+ # There should be two primary matchup points
+ assert len(matchup_result) == 2
+ # Each primary point matched with 1 matchup point
+ assert len(matchup_result[0]) == 2
+ assert len(matchup_result[1]) == 2
+ # Check that the satellite point was matched to the expected
insitu point
+ assert matchup_result[0][1].latitude == 3.0
+ assert matchup_result[0][1].longitude == 18.0
+ assert matchup_result[1][1].latitude == 15.0
+ assert matchup_result[1][1].longitude == 5.0
+ # Check that the insitu points have the expected values
+ if insitu_matchup:
+ assert matchup_result[0][1].sst == 30.0
+ assert matchup_result[1][1].sst == 10.0
+ else:
+ assert matchup_result[0][1].satellite_var_value == 30.0
+ assert matchup_result[1][1].satellite_var_value == 10.0
+ # Check that the satellite points have the expected values
+ assert matchup_result[0][0].satellite_var_value == 21.0
+ assert matchup_result[1][0].satellite_var_value == 31.0
+
+ insitu_matchup_result = list(generator)
+ validate_matchup_result(insitu_matchup_result, insitu_matchup=True)
+
+ # Test the satellite->satellite branch
+ # By mocking the getEndpointsByName function to return None we
+ # are forcing Matchup to think this dummy matchup dataset is
+ # satellite dataset
+ mock_edge_endpoints.return_value = None
+
+ # Open the edge response json. We want to convert these points
+ # to tile points so we can test sat to sat matchup
+ edge_json = json.load(open(os.path.join(test_dir,
'edge_response.json')))
+ points = [wkt.loads(result['point']) for result in
edge_json['results']]
+
+ matchup_tile = Tile()
+ matchup_tile.tile_id = 1
+ matchup_tile.tile_min_lat = 3
+ matchup_tile.tile_max_lat = 15
+ matchup_tile.tile_min_lon = 5
+ matchup_tile.tile_max_lon = 18
+ matchup_tile.dataset = 'test-dataset'
+ matchup_tile.dataset_id = 123
+ matchup_tile.granule = 'test-granule-name'
+ matchup_tile.min_time = '2020-07-28T00:00:00'
+ matchup_tile.max_time = '2020-07-28T00:00:00'
+ matchup_tile.section_spec = 'test-section-spec'
+ matchup_tile.var_name = 'sea_surface_temperature'
+ matchup_tile.latitudes = np.array([point.y for point in points],
dtype=np.float32)
+ matchup_tile.longitudes = np.array([point.x for point in points],
dtype=np.float32)
+ matchup_tile.times = [edge_json['results'][0]['time']]
+ matchup_tile.data = np.array([[[10.0, 0, 0], [0, 20.0, 0], [0, 0,
30.0]]])
+ matchup_tile.get_indices = lambda: [[0, 0, 0], [0, 1, 1], [0, 2, 2]]
+ matchup_tile.meta_data = {'wind_type': []}
+
+ tile_service.find_tiles_in_polygon.return_value = [matchup_tile]
+
+ generator = matchup.match_satellite_to_insitu(**match_args)
+
+ sat_matchup_result = list(generator)
+ validate_matchup_result(sat_matchup_result, insitu_matchup=False)
diff --git a/analysis/tests/conftest.py b/analysis/tests/conftest.py
new file mode 100644
index 0000000..43af49c
--- /dev/null
+++ b/analysis/tests/conftest.py
@@ -0,0 +1,34 @@
+import logging
+import pytest
+import os
+
+from pyspark import HiveContext
+from pyspark import SparkConf
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+
+def quiet_py4j():
+ logger = logging.getLogger('py4j')
+ logger.setLevel(logging.WARN)
+
[email protected](scope="session")
+def spark_context(request):
+ conf =
(SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
+ request.addfinalizer(lambda: sc.stop())
+
+ sc = SparkContext(conf=conf)
+ quiet_py4j()
+ return sc
+
[email protected](scope="session")
+def hive_context(spark_context):
+ return HiveContext(spark_context)
+
[email protected](scope="session")
+def streaming_context(spark_context):
+ return StreamingContext(spark_context, 1)
+
[email protected](scope="session")
+def setup_pyspark_env():
+ os.environ['PYSPARK_DRIVER_PYTHON'] =
'/usr/local/anaconda3/envs/nexus-messages3/bin/python'
+ os.environ['PYSPARK_PYTHON'] =
'/usr/local/anaconda3/envs/nexus-messages3/bin/python'
\ No newline at end of file
diff --git a/analysis/tests/data/edge_response.json
b/analysis/tests/data/edge_response.json
new file mode 100644
index 0000000..99be82d
--- /dev/null
+++ b/analysis/tests/data/edge_response.json
@@ -0,0 +1,24 @@
+{
+ "totalResults": 3,
+ "results": [
+ {
+ "point": "Point(5.0 15.0)",
+ "time": 1595954285,
+ "sea_water_temperature": 10.0,
+ "id": 1234
+ },
+ {
+ "point": "Point(10.0 10.0)",
+ "time": 1595954286,
+ "sea_water_temperature": 20.0,
+ "id": 1235
+ },
+ {
+ "point": "Point(18.0 3.0)",
+ "time": 1595954287,
+ "sea_water_temperature": 30.0,
+ "id": 1236
+ }
+ ]
+}
+
diff --git a/analysis/tests/integration/algorithms_spark/test_matchup.py
b/analysis/tests/integration/algorithms_spark/test_matchup.py
new file mode 100644
index 0000000..61bd04b
--- /dev/null
+++ b/analysis/tests/integration/algorithms_spark/test_matchup.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+import pytest
+from shapely import wkt
+from webservice.algorithms_spark.Matchup import *
+
+
[email protected]
+class TestMatchup(unittest.TestCase):
+ def setUp(self):
+ from os import environ
+ environ['PYSPARK_DRIVER_PYTHON'] =
'/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
+ environ['PYSPARK_PYTHON'] =
'/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
+ environ['SPARK_HOME'] =
'/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7'
+
+ def test_mur_match(self):
+ from shapely.wkt import loads
+ from nexustiles.nexustiles import NexusTileService
+
+ polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00,
-34.98 31.00, -34.98 29.54))")
+ primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1"
+ matchup_ds = "spurs"
+ parameter = "sst"
+ start_time = 1350259200 # 2012-10-15T00:00:00Z
+ end_time = 1350345600 # 2012-10-16T00:00:00Z
+ time_tolerance = 86400
+ depth_tolerance = 5.0
+ radius_tolerance = 1500.0
+ platforms = "1,2,3,4,5,6,7,8,9"
+
+ tile_service = NexusTileService()
+ tile_ids = [tile.tile_id for tile in
+ tile_service.find_tiles_in_polygon(polygon, primary_ds,
start_time, end_time, fetch_data=False,
+ fl='id')]
+ result = spark_matchup_driver(tile_ids, wkt.dumps(polygon),
primary_ds, matchup_ds, parameter, time_tolerance,
+ depth_tolerance, radius_tolerance,
platforms)
+ for k, v in result.items():
+ print("primary: %s\n\tmatches:\n\t\t%s" % (
+ "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude,
k.latitude, k.time, k.sst),
+ '\n\t\t'.join(
+ ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude,
i.latitude, i.time, i.sst) for i in v])))
+
+ def test_smap_match(self):
+ from shapely.wkt import loads
+ from nexustiles.nexustiles import NexusTileService
+
+ polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00,
-34.98 31.00, -34.98 29.54))")
+ primary_ds = "SMAP_L2B_SSS"
+ matchup_ds = "spurs"
+ parameter = "sss"
+ start_time = 1350259200 # 2012-10-15T00:00:00Z
+ end_time = 1350345600 # 2012-10-16T00:00:00Z
+ time_tolerance = 86400
+ depth_tolerance = 5.0
+ radius_tolerance = 1500.0
+ platforms = "1,2,3,4,5,6,7,8,9"
+
+ tile_service = NexusTileService()
+ tile_ids = [tile.tile_id for tile in
+ tile_service.find_tiles_in_polygon(polygon, primary_ds,
start_time, end_time, fetch_data=False,
+ fl='id')]
+ result = spark_matchup_driver(tile_ids, wkt.dumps(polygon),
primary_ds, matchup_ds, parameter, time_tolerance,
+ depth_tolerance, radius_tolerance,
platforms)
+ for k, v in result.items():
+ print("primary: %s\n\tmatches:\n\t\t%s" % (
+ "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude,
k.latitude, k.time, k.sst),
+ '\n\t\t'.join(
+ ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude,
i.latitude, i.time, i.sst) for i in v])))
+
+ def test_ascatb_match(self):
+ from shapely.wkt import loads
+ from nexustiles.nexustiles import NexusTileService
+
+ polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00,
-34.98 31.00, -34.98 29.54))")
+ primary_ds = "ASCATB-L2-Coastal"
+ matchup_ds = "spurs"
+ parameter = "wind"
+ start_time = 1351468800 # 2012-10-29T00:00:00Z
+ end_time = 1351555200 # 2012-10-30T00:00:00Z
+ time_tolerance = 86400
+ depth_tolerance = 5.0
+ radius_tolerance = 110000.0 # 110 km
+ platforms = "1,2,3,4,5,6,7,8,9"
+
+ tile_service = NexusTileService()
+ tile_ids = [tile.tile_id for tile in
+ tile_service.find_tiles_in_polygon(polygon, primary_ds,
start_time, end_time, fetch_data=False,
+ fl='id')]
+ result = spark_matchup_driver(tile_ids, wkt.dumps(polygon),
primary_ds, matchup_ds, parameter, time_tolerance,
+ depth_tolerance, radius_tolerance,
platforms)
+ for k, v in result.items():
+ print("primary: %s\n\tmatches:\n\t\t%s" % (
+ "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude,
k.latitude, k.time, k.wind_u, k.wind_v),
+ '\n\t\t'.join(
+ ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (
+ i.longitude, i.latitude, i.time, i.wind_u, i.wind_v)
for i in v])))
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
index c91c883..dbfc692 100644
--- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -74,6 +74,8 @@ class DomsEncoder(json.JSONEncoder):
return int((obj - EPOCH).total_seconds())
elif isinstance(obj, Decimal):
return str(obj)
+ elif isinstance(obj, np.float32):
+ return float(obj)
else:
return json.JSONEncoder.default(self, obj)
diff --git a/analysis/webservice/algorithms_spark/Matchup.py
b/analysis/webservice/algorithms_spark/Matchup.py
index a5277b3..f6275f4 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -18,6 +18,7 @@
import json
import logging
import threading
+from shapely.geometry import Polygon
from datetime import datetime
from itertools import chain
from math import cos, radians
@@ -434,6 +435,10 @@ class DomsPoint(object):
except KeyError:
point.data_id = "%s:%s:%s" % (point.time, point.longitude,
point.latitude)
+ if 'var_name' in edge_point and 'var_value' in edge_point:
+ point.satellite_var_name = edge_point['var_name']
+ point.satellite_var_value = edge_point['var_value']
+
return point
@@ -463,11 +468,25 @@ def spark_matchup_driver(tile_ids, bounding_wkt,
primary_ds_name, matchup_ds_nam
# Map Partitions ( list(tile_id) )
rdd_filtered = rdd.mapPartitions(
- partial(match_satellite_to_insitu, primary_b=primary_b,
matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
- rt_b=rt_b, platforms_b=platforms_b,
bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
- depth_max_b=depth_max_b,
tile_service_factory=tile_service_factory), preservesPartitioning=True) \
- .filter(lambda p_m_tuple: abs(
- iso_time_to_epoch(p_m_tuple[0].time) -
iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
+ partial(
+ match_satellite_to_insitu,
+ primary_b=primary_b,
+ matchup_b=matchup_b,
+ parameter_b=parameter_b,
+ tt_b=tt_b,
+ rt_b=rt_b,
+ platforms_b=platforms_b,
+ bounding_wkt_b=bounding_wkt_b,
+ depth_min_b=depth_min_b,
+ depth_max_b=depth_max_b,
+ tile_service_factory=tile_service_factory
+ ),
+ preservesPartitioning=True
+ ).filter(
+ lambda p_m_tuple: abs(
+ iso_time_to_epoch(p_m_tuple[0].time) -
iso_time_to_epoch(p_m_tuple[1].time)
+ ) <= time_tolerance
+ )
if match_once:
# Only the 'nearest' point for each primary should be returned. Add an
extra map/reduce which calculates
@@ -523,6 +542,25 @@ def add_meters_to_lon_lat(lon, lat, meters):
return longitude, latitude
+def tile_to_edge_points(tile):
+ indices = tile.get_indices()
+ edge_points = []
+
+ for idx in indices:
+ edge_point = {
+ 'point': f'Point({tile.longitudes[idx[2]]}
{tile.latitudes[idx[1]]})',
+ 'time':
datetime.utcfromtimestamp(tile.times[idx[0]]).strftime('%Y-%m-%dT%H:%M:%SZ'),
+ 'source': tile.dataset,
+ 'platform': None,
+ 'device': None,
+ 'fileurl': tile.granule,
+ 'var_name': tile.var_name,
+ 'var_value': tile.data[tuple(idx)]
+ }
+ edge_points.append(edge_point)
+ return edge_points
+
+
def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b,
tt_b, rt_b, platforms_b,
bounding_wkt_b, depth_min_b, depth_max_b,
tile_service_factory):
the_time = datetime.now()
@@ -560,38 +598,77 @@ def match_satellite_to_insitu(tile_ids, primary_b,
matchup_b, parameter_b, tt_b,
str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
# Query edge for all points within the spatial-temporal extents of this
partition
- the_time = datetime.now()
- edge_session = requests.Session()
- edge_results = []
- with edge_session:
- for insitudata_name in matchup_b.value.split(','):
- bbox = ','.join(
- [str(matchup_min_lon), str(matchup_min_lat),
str(matchup_max_lon), str(matchup_max_lat)])
- edge_response = query_edge(insitudata_name, parameter_b.value,
matchup_min_time, matchup_max_time, bbox,
- platforms_b.value, depth_min_b.value,
depth_max_b.value, session=edge_session)
- if edge_response['totalResults'] == 0:
- continue
- r = edge_response['results']
- for p in r:
- p['source'] = insitudata_name
- edge_results.extend(r)
- print("%s Time to call edge for partition %s to %s" % (str(datetime.now()
- the_time), tile_ids[0], tile_ids[-1]))
- if len(edge_results) == 0:
- return []
+ is_insitu_dataset = edge_endpoints.getEndpointByName(matchup_b.value)
- # Convert edge points to utm
- the_time = datetime.now()
- matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
- for n, edge_point in enumerate(edge_results):
- try:
- x, y = wkt.loads(edge_point['point']).coords[0]
- except WKTReadingError:
+ if is_insitu_dataset:
+ the_time = datetime.now()
+ edge_session = requests.Session()
+ edge_results = []
+ with edge_session:
+ for insitudata_name in matchup_b.value.split(','):
+ bbox = ','.join(
+ [str(matchup_min_lon), str(matchup_min_lat),
str(matchup_max_lon), str(matchup_max_lat)])
+ edge_response = query_edge(insitudata_name, parameter_b.value,
matchup_min_time, matchup_max_time, bbox,
+ platforms_b.value,
depth_min_b.value, depth_max_b.value, session=edge_session)
+ if edge_response['totalResults'] == 0:
+ continue
+ r = edge_response['results']
+ for p in r:
+ p['source'] = insitudata_name
+ edge_results.extend(r)
+ print("%s Time to call edge for partition %s to %s" %
(str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
+ if len(edge_results) == 0:
+ return []
+
+ # Convert edge points to utm
+ the_time = datetime.now()
+ matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
+ for n, edge_point in enumerate(edge_results):
try:
- x, y = Point(*[float(c) for c in edge_point['point'].split('
')]).coords[0]
- except ValueError:
- y, x = Point(*[float(c) for c in
edge_point['point'].split(',')]).coords[0]
+ x, y = wkt.loads(edge_point['point']).coords[0]
+ except WKTReadingError:
+ try:
+ x, y = Point(*[float(c) for c in
edge_point['point'].split(' ')]).coords[0]
+ except ValueError:
+ y, x = Point(*[float(c) for c in
edge_point['point'].split(',')]).coords[0]
+
+ matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
+ else:
+ # Query nexus (cassandra? solr?) to find matching points.
+ bbox = ','.join(
+ [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon),
+ str(matchup_max_lat)])
+ west, south, east, north = [float(b) for b in bbox.split(",")]
+ polygon = Polygon(
+ [(west, south), (east, south), (east, north), (west, north),
(west, south)])
+
+ matchup_tiles = tile_service.find_tiles_in_polygon(
+ bounding_polygon=polygon,
+ ds=matchup_b.value,
+ start_time=matchup_min_time,
+ end_time=matchup_max_time,
+ fetch_data=True,
+ sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat
asc'],
+ rows=5000
+ )
+
+ # Convert Tile IDS to tiles and convert to UTM lat/lon projection.
+ matchup_points = []
+ for tile in matchup_tiles:
+ valid_indices = tile.get_indices()
+ primary_points = np.array([aeqd_proj(
+ tile.longitudes[aslice[2]],
+ tile.latitudes[aslice[1]]
+ ) for aslice in valid_indices])
+ matchup_points.extend(primary_points)
+
+ # Convert tiles to 'edge points' which match the format of in-situ
edge points.
+ edge_results = []
+ for matchup_tile in matchup_tiles:
+ edge_results.extend(tile_to_edge_points(matchup_tile))
+
+ matchup_points = np.array(matchup_points)
- matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
print("%s Time to convert match points for partition %s to %s" % (
str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))