This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new c07a689  SDAP-315: Updated matchup algorithm to support satellite to 
satellite (#131)
c07a689 is described below

commit c07a689edbb4ef529c3c747820b2b8bd19d83a51
Author: Stepheny Perez <[email protected]>
AuthorDate: Wed Aug 4 17:21:12 2021 -0700

    SDAP-315: Updated matchup algorithm to support satellite to satellite (#131)
    
    * SDAP-315: Updated matchup algorithm to support satellite to satellite
    
    * SDAP-315: Updated unit tests to a passing state. Moved integration tests 
to a new test directory
---
 analysis/tests/README.md                           |  23 ++
 analysis/tests/algorithms_spark/Matchup_test.py    | 321 ------------------
 analysis/tests/algorithms_spark/test_matchup.py    | 357 +++++++++++++++++++++
 analysis/tests/conftest.py                         |  34 ++
 analysis/tests/data/edge_response.json             |  24 ++
 .../integration/algorithms_spark/test_matchup.py   | 111 +++++++
 .../webservice/algorithms/doms/BaseDomsHandler.py  |   2 +
 analysis/webservice/algorithms_spark/Matchup.py    | 145 +++++++--
 8 files changed, 662 insertions(+), 355 deletions(-)

diff --git a/analysis/tests/README.md b/analysis/tests/README.md
new file mode 100644
index 0000000..a0f5cb9
--- /dev/null
+++ b/analysis/tests/README.md
@@ -0,0 +1,23 @@
+# Apache SDAP Testing
+
+## Unit Tests
+
+Unit tests don't contain any externally running dependencies (like Solr 
+or Cassandra). Unit tests, unlike integration tests, do not contain an 
+'integration' pytest marker. To run all unit tests and skip integration 
+tests, run the following command:
+
+```shell script
+pytest -m "not integration" analysis/tests/
+```
+
+## Integration Tests
+
+To run integration tests, Cassandra and Solr must be running locally.
+
+Integration tests have been marked with 'integration' pytest markers. 
+In order to run only integration tests, run the following command:
+
+```shell script
+pytest -m "integration" analysis/tests/
+```
diff --git a/analysis/tests/algorithms_spark/Matchup_test.py 
b/analysis/tests/algorithms_spark/Matchup_test.py
deleted file mode 100644
index bb39e70..0000000
--- a/analysis/tests/algorithms_spark/Matchup_test.py
+++ /dev/null
@@ -1,321 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import pickle
-import random
-import timeit
-import unittest
-
-from webservice.algorithms_spark.Matchup import *
-
-
-class TestMatch_Points(unittest.TestCase):
-    def test_one_point_match_exact(self):
-        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, 
data_id=1)
-        matchup = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, 
data_id=2)
-
-        primary_points = [primary]
-        matchup_points = [matchup]
-
-        matches = list(match_points_generator(primary_points, matchup_points, 
0))
-
-        self.assertEqual(1, len(matches))
-
-        p_match_point, match = matches[0]
-
-        self.assertEqual(primary, p_match_point)
-        self.assertEqual(matchup, match)
-
-    def test_one_point_match_within_tolerance_150km(self):
-        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, 
data_id=1)
-        matchup = DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=5.0, 
data_id=2)
-
-        primary_points = [primary]
-        matchup_points = [matchup]
-
-        matches = list(match_points_generator(primary_points, matchup_points, 
150000))  # tolerance 150 km
-
-        self.assertEqual(1, len(matches))
-
-        p_match_point, match = matches[0]
-
-        self.assertEqual(primary, p_match_point)
-        self.assertEqual(matchup, match)
-
-    def test_one_point_match_within_tolerance_200m(self):
-        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, 
data_id=1)
-        matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, 
depth=5.0, data_id=2)
-
-        primary_points = [primary]
-        matchup_points = [matchup]
-
-        matches = list(match_points_generator(primary_points, matchup_points, 
200))  # tolerance 200 m
-
-        self.assertEqual(1, len(matches))
-
-        p_match_point, match = matches[0]
-
-        self.assertEqual(primary, p_match_point)
-        self.assertEqual(matchup, match)
-
-    def test_one_point_not_match_tolerance_150km(self):
-        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, 
data_id=1)
-        matchup = DomsPoint(longitude=1.0, latitude=4.0, time=1000, depth=5.0, 
data_id=2)
-
-        primary_points = [primary]
-        matchup_points = [matchup]
-
-        matches = list(match_points_generator(primary_points, matchup_points, 
150000))  # tolerance 150 km
-
-        self.assertEqual(0, len(matches))
-
-    def test_one_point_not_match_tolerance_100m(self):
-        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, 
data_id=1)
-        matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, 
depth=5.0, data_id=2)
-
-        primary_points = [primary]
-        matchup_points = [matchup]
-
-        matches = list(match_points_generator(primary_points, matchup_points, 
100))  # tolerance 100 m
-
-        self.assertEqual(0, len(matches))
-
-    def test_multiple_point_match(self):
-        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, 
data_id=1)
-        primary_points = [primary]
-
-        matchup_points = [
-            DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, 
data_id=2),
-            DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, 
data_id=3),
-            DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, 
data_id=4)
-        ]
-
-        matches = list(match_points_generator(primary_points, matchup_points, 
150000))  # tolerance 150 km
-
-        self.assertEqual(3, len(matches))
-
-        self.assertSetEqual({primary}, {x[0] for x in matches})
-
-        list_of_matches = [x[1] for x in matches]
-
-        self.assertEqual(3, len(list_of_matches))
-        self.assertItemsEqual(matchup_points, list_of_matches)
-
-    def test_multiple_point_match_multiple_times(self):
-        primary_points = [
-            DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, 
data_id=1),
-            DomsPoint(longitude=1.5, latitude=1.5, time=1000, depth=5.0, 
data_id=2)
-        ]
-
-        matchup_points = [
-            DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, 
data_id=3),
-            DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, 
data_id=4),
-            DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, 
data_id=5)
-        ]
-
-        matches = list(match_points_generator(primary_points, matchup_points, 
150000))  # tolerance 150 km
-
-        self.assertEqual(5, len(matches))
-
-        self.assertSetEqual({p for p in primary_points}, {x[0] for x in 
matches})
-
-        # First primary point matches all 3 secondary
-        self.assertEqual(3, [x[0] for x in matches].count(primary_points[0]))
-        self.assertItemsEqual(matchup_points, [x[1] for x in matches if x[0] 
== primary_points[0]])
-
-        # Second primary point matches only last 2 secondary
-        self.assertEqual(2, [x[0] for x in matches].count(primary_points[1]))
-        self.assertItemsEqual(matchup_points[1:], [x[1] for x in matches if 
x[0] == primary_points[1]])
-
-    def test_one_of_many_primary_matches_one_of_many_matchup(self):
-        primary_points = [
-            DomsPoint(longitude=-33.76764, latitude=30.42946, time=1351553994, 
data_id=1),
-            DomsPoint(longitude=-33.75731, latitude=29.86216, time=1351554004, 
data_id=2)
-        ]
-
-        matchup_points = [
-            DomsPoint(longitude=-33.762, latitude=28.877, time=1351521432, 
depth=3.973, data_id=3),
-            DomsPoint(longitude=-34.916, latitude=28.879, time=1351521770, 
depth=2.9798, data_id=4),
-            DomsPoint(longitude=-31.121, latitude=31.256, time=1351519892, 
depth=4.07, data_id=5)
-        ]
-
-        matches = list(match_points_generator(primary_points, matchup_points, 
110000))  # tolerance 110 km
-
-        self.assertEqual(1, len(matches))
-
-        self.assertSetEqual({p for p in primary_points if p.data_id == 2}, 
{x[0] for x in matches})
-
-        # First primary point matches none
-        self.assertEqual(0, [x[0] for x in matches].count(primary_points[0]))
-
-        # Second primary point matches only first secondary
-        self.assertEqual(1, [x[0] for x in matches].count(primary_points[1]))
-        self.assertItemsEqual(matchup_points[0:1], [x[1] for x in matches if 
x[0] == primary_points[1]])
-
-    @unittest.skip("This test is just for timing, doesn't actually assert 
anything.")
-    def test_time_many_primary_many_matchup(self):
-        import logging
-        import sys
-        logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - 
%(name)s - %(levelname)s - %(message)s',
-                            datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
-        log = logging.getLogger(__name__)
-        # Generate 160000 DomsPoints distributed equally in a box from -2.0 
lat/lon to 2.0 lat/lon
-        log.info("Generating primary points")
-        x = np.arange(-2.0, 2.0, 0.01)
-        y = np.arange(-2.0, 2.0, 0.01)
-        primary_points = [DomsPoint(longitude=xy[0], latitude=xy[1], 
time=1000, depth=5.0, data_id=i) for i, xy in
-                          enumerate(np.array(np.meshgrid(x, y)).T.reshape(-1, 
2))]
-
-        # Generate 2000 DomsPoints distributed randomly in a box from -2.0 
lat/lon to 2.0 lat/lon
-        log.info("Generating matchup points")
-        matchup_points = [
-            DomsPoint(longitude=random.uniform(-2.0, 2.0), 
latitude=random.uniform(-2.0, 2.0), time=1000, depth=5.0,
-                      data_id=i) for i in range(0, 2000)]
-
-        log.info("Starting matchup")
-        log.info("Best of repeat(3, 2) matchups: %s seconds" % min(
-            timeit.repeat(lambda: list(match_points_generator(primary_points, 
matchup_points, 1500)), repeat=3,
-                          number=2)))
-
-
-class TestDOMSPoint(unittest.TestCase):
-    def test_is_pickleable(self):
-        edge_point = json.loads("""{
-"id": "argo-profiles-5903995(46, 0)",
-"time": "2012-10-15T14:24:04Z",
-"point": "-33.467 29.728",
-"sea_water_temperature": 24.5629997253,
-"sea_water_temperature_depth": 2.9796258642,
-"wind_speed": null,
-"sea_water_salinity": null,
-"sea_water_salinity_depth": null,
-"platform": 4,
-"device": 3,
-"fileurl": 
"ftp://podaac-ftp.jpl.nasa.gov/allData/insitu/L2/spurs1/argo/argo-profiles-5903995.nc";
-}""")
-        point = DomsPoint.from_edge_point(edge_point)
-        self.assertIsNotNone(pickle.dumps(point))
-
-
-def check_all():
-    return check_solr() and check_cass() and check_edge()
-
-
-def check_solr():
-    # TODO eventually this might do something.
-    return False
-
-
-def check_cass():
-    # TODO eventually this might do something.
-    return False
-
-
-def check_edge():
-    # TODO eventually this might do something.
-    return False
-
-
[email protected](check_all(),
-                     "These tests require local instances of Solr, Cassandra, 
and Edge to be running.")
-class TestMatchup(unittest.TestCase):
-    def setUp(self):
-        from os import environ
-        environ['PYSPARK_DRIVER_PYTHON'] = 
'/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
-        environ['PYSPARK_PYTHON'] = 
'/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
-        environ['SPARK_HOME'] = 
'/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7'
-
-    def test_mur_match(self):
-        from shapely.wkt import loads
-        from nexustiles.nexustiles import NexusTileService
-
-        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, 
-34.98 31.00, -34.98 29.54))")
-        primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1"
-        matchup_ds = "spurs"
-        parameter = "sst"
-        start_time = 1350259200  # 2012-10-15T00:00:00Z
-        end_time = 1350345600  # 2012-10-16T00:00:00Z
-        time_tolerance = 86400
-        depth_tolerance = 5.0
-        radius_tolerance = 1500.0
-        platforms = "1,2,3,4,5,6,7,8,9"
-
-        tile_service = NexusTileService()
-        tile_ids = [tile.tile_id for tile in
-                    tile_service.find_tiles_in_polygon(polygon, primary_ds, 
start_time, end_time, fetch_data=False,
-                                                       fl='id')]
-        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), 
primary_ds, matchup_ds, parameter, time_tolerance,
-                                      depth_tolerance, radius_tolerance, 
platforms)
-        for k, v in result.items():
-            print("primary: %s\n\tmatches:\n\t\t%s" % (
-                "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, 
k.latitude, k.time, k.sst),
-                '\n\t\t'.join(
-                    ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, 
i.latitude, i.time, i.sst) for i in v])))
-
-    def test_smap_match(self):
-        from shapely.wkt import loads
-        from nexustiles.nexustiles import NexusTileService
-
-        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, 
-34.98 31.00, -34.98 29.54))")
-        primary_ds = "SMAP_L2B_SSS"
-        matchup_ds = "spurs"
-        parameter = "sss"
-        start_time = 1350259200  # 2012-10-15T00:00:00Z
-        end_time = 1350345600  # 2012-10-16T00:00:00Z
-        time_tolerance = 86400
-        depth_tolerance = 5.0
-        radius_tolerance = 1500.0
-        platforms = "1,2,3,4,5,6,7,8,9"
-
-        tile_service = NexusTileService()
-        tile_ids = [tile.tile_id for tile in
-                    tile_service.find_tiles_in_polygon(polygon, primary_ds, 
start_time, end_time, fetch_data=False,
-                                                       fl='id')]
-        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), 
primary_ds, matchup_ds, parameter, time_tolerance,
-                                      depth_tolerance, radius_tolerance, 
platforms)
-        for k, v in result.items():
-            print("primary: %s\n\tmatches:\n\t\t%s" % (
-                "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, 
k.latitude, k.time, k.sst),
-                '\n\t\t'.join(
-                    ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, 
i.latitude, i.time, i.sst) for i in v])))
-
-    def test_ascatb_match(self):
-        from shapely.wkt import loads
-        from nexustiles.nexustiles import NexusTileService
-
-        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, 
-34.98 31.00, -34.98 29.54))")
-        primary_ds = "ASCATB-L2-Coastal"
-        matchup_ds = "spurs"
-        parameter = "wind"
-        start_time = 1351468800  # 2012-10-29T00:00:00Z
-        end_time = 1351555200  # 2012-10-30T00:00:00Z
-        time_tolerance = 86400
-        depth_tolerance = 5.0
-        radius_tolerance = 110000.0  # 110 km
-        platforms = "1,2,3,4,5,6,7,8,9"
-
-        tile_service = NexusTileService()
-        tile_ids = [tile.tile_id for tile in
-                    tile_service.find_tiles_in_polygon(polygon, primary_ds, 
start_time, end_time, fetch_data=False,
-                                                       fl='id')]
-        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), 
primary_ds, matchup_ds, parameter, time_tolerance,
-                                      depth_tolerance, radius_tolerance, 
platforms)
-        for k, v in result.items():
-            print("primary: %s\n\tmatches:\n\t\t%s" % (
-                "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude, 
k.latitude, k.time, k.wind_u, k.wind_v),
-                '\n\t\t'.join(
-                    ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (
-                        i.longitude, i.latitude, i.time, i.wind_u, i.wind_v) 
for i in v])))
diff --git a/analysis/tests/algorithms_spark/test_matchup.py 
b/analysis/tests/algorithms_spark/test_matchup.py
new file mode 100644
index 0000000..1fcb5ce
--- /dev/null
+++ b/analysis/tests/algorithms_spark/test_matchup.py
@@ -0,0 +1,357 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import json
+import os
+import pickle
+from datetime import datetime, timezone
+
+import mock
+import numpy as np
+import pytest
+import webservice.algorithms_spark.Matchup as matchup
+from nexustiles.model.nexusmodel import Tile
+from pyspark.sql import SparkSession
+from shapely import wkt
+from shapely.geometry import box
+from webservice.algorithms_spark.Matchup import DomsPoint, Matchup
+
+
[email protected](scope='function')
+def test_dir():
+    test_dir = os.path.dirname(os.path.realpath(__file__))
+    test_data_dir = os.path.join(test_dir, '..', 'data')
+    yield test_data_dir
+
+
+def test_doms_point_is_pickleable():
+    edge_point = {
+        'id': 'argo-profiles-5903995(46, 0)',
+        'time': '2012-10-15T14:24:04Z',
+        'point': '-33.467 29.728',
+        'sea_water_temperature': 24.5629997253,
+        'sea_water_temperature_depth': 2.9796258642,
+        'wind_speed': None,
+        'sea_water_salinity': None,
+        'sea_water_salinity_depth': None,
+        'platform': 4,
+        'device': 3,
+        'fileurl': 
'ftp://podaac-ftp.jpl.nasa.gov/allData/argo-profiles-5903995.nc'
+    }
+    point = DomsPoint.from_edge_point(edge_point)
+    assert pickle.dumps(point) is not None
+
+
+def test_calc():
+    """
+    Assert that the expected functions are called during the matchup
+    calculation and that the results are formatted as expected.
+    """
+    # Mock anything that connects external dependence (Solr, Cassandra, ...)
+    tile_service_factory = mock.MagicMock()
+    tile_service = mock.MagicMock()
+    tile_service_factory.return_value = tile_service
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+    request = mock.MagicMock()
+    request.get_argument.return_value = '1,2,3,4'
+
+    # Patch in request arguments
+    start_time = datetime.strptime('2020-01-01T00:00:00', 
'%Y-%m-%dT%H:%M:%S').replace(
+        tzinfo=timezone.utc)
+    end_time = datetime.strptime('2020-02-01T00:00:00', 
'%Y-%m-%dT%H:%M:%S').replace(
+        tzinfo=timezone.utc)
+    polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 
31.00, -34.98 29.54))'
+    args = {
+        'bounding_polygon': wkt.loads(polygon_wkt),
+        'primary_ds_name': 'primary-ds-name',
+        'matchup_ds_names': 'matchup-ds-name',
+        'parameter_s': 'sst',
+        'start_time': start_time,
+        'start_seconds_from_epoch': start_time.timestamp(),
+        'end_time': end_time,
+        'end_seconds_from_epoch': end_time.timestamp(),
+        'depth_min': 1.0,
+        'depth_max': 2.0,
+        'time_tolerance': 3.0,
+        'radius_tolerance': 4.0,
+        'platforms': '1,2,3,4,5,6,7,8,9',
+        'match_once': True,
+        'result_size_limit': 10
+    }
+
+    def generate_fake_tile(tile_id):
+        tile = Tile()
+        tile.tile_id = tile_id
+        return tile
+
+    # Mock tiles
+    fake_tiles = [generate_fake_tile(idx) for idx in range(10)]
+    tile_service.find_tiles_in_polygon.return_value = fake_tiles
+
+    # Mock result
+    # Format of 'spark_result': keys=domspoint,values=list of domspoint
+
+    doms_point_args = {
+        'longitude': -180,
+        'latitude': -90,
+        'time': '2020-01-15T00:00:00Z'
+    }
+    d1_sat = DomsPoint(**doms_point_args)
+    d2_sat = DomsPoint(**doms_point_args)
+    d1_ins = DomsPoint(**doms_point_args)
+    d2_ins = DomsPoint(**doms_point_args)
+
+    d1_sat.satellite_var_name = 'sea_surface_temperature'
+    d2_sat.satellite_var_name = 'sea_surface_temperature'
+    d1_ins.satellite_var_name = 'sea_surface_temperature'
+    d2_ins.satellite_var_name = 'sea_surface_temperature'
+
+    d1_sat.satellite_var_value = 10.0
+    d2_sat.satellite_var_value = 20.0
+    d1_ins.satellite_var_value = 30.0
+    d2_ins.satellite_var_value = 40.0
+
+    fake_spark_result = {
+        d1_sat: [d1_ins, d2_ins],
+        d2_sat: [d1_ins, d2_ins],
+    }
+
+    matchup_obj = Matchup(tile_service_factory=tile_service_factory, 
sc=spark_context)
+    matchup_obj.parse_arguments = lambda _: [item for item in args.values()]
+
+    with mock.patch('webservice.algorithms_spark.Matchup.ResultsStorage') as 
mock_rs, \
+            mock.patch(
+                'webservice.algorithms_spark.Matchup.spark_matchup_driver') as 
mock_matchup_driver:
+        mock_rs.insertExecution.return_value = 1
+        mock_matchup_driver.return_value = fake_spark_result
+        matchup_result = matchup_obj.calc(request)
+
+        # Ensure the call to 'spark_matchup_driver' contains the expected 
params
+        assert len(mock_matchup_driver.call_args_list) == 1
+        matchup_driver_args = mock_matchup_driver.call_args_list[0].args
+        matchup_driver_kwargs = mock_matchup_driver.call_args_list[0].kwargs
+        assert matchup_driver_args[0] == [tile.tile_id for tile in fake_tiles]
+        assert wkt.loads(matchup_driver_args[1]).equals(wkt.loads(polygon_wkt))
+        assert matchup_driver_args[2] == args['primary_ds_name']
+        assert matchup_driver_args[3] == args['matchup_ds_names']
+        assert matchup_driver_args[4] == args['parameter_s']
+        assert matchup_driver_args[5] == args['depth_min']
+        assert matchup_driver_args[6] == args['depth_max']
+        assert matchup_driver_args[7] == args['time_tolerance']
+        assert matchup_driver_args[8] == args['radius_tolerance']
+        assert matchup_driver_args[9] == args['platforms']
+        assert matchup_driver_args[10] == args['match_once']
+        assert matchup_driver_args[11] == tile_service_factory
+        assert matchup_driver_kwargs['sc'] == spark_context
+
+        # Ensure the result of the matchup calculation is as expected
+
+        json_matchup_result = json.loads(matchup_result.toJson())
+        assert len(json_matchup_result['data']) == 2
+        assert len(json_matchup_result['data'][0]['matches']) == 2
+        assert len(json_matchup_result['data'][1]['matches']) == 2
+
+        for data in json_matchup_result['data']:
+            assert data['x'] == '-180'
+            assert data['y'] == '-90'
+            for matches in data['matches']:
+                assert matches['x'] == '-180'
+                assert matches['y'] == '-90'
+
+        assert json_matchup_result['data'][0]['sea_surface_temperature'] == 
10.0
+        assert json_matchup_result['data'][1]['sea_surface_temperature'] == 
20.0
+        assert 
json_matchup_result['data'][0]['matches'][0]['sea_surface_temperature'] == 30.0
+        assert 
json_matchup_result['data'][0]['matches'][1]['sea_surface_temperature'] == 40.0
+        assert 
json_matchup_result['data'][1]['matches'][0]['sea_surface_temperature'] == 30.0
+        assert 
json_matchup_result['data'][1]['matches'][1]['sea_surface_temperature'] == 40.0
+
+        assert json_matchup_result['details']['numInSituMatched'] == 4
+        assert json_matchup_result['details']['numGriddedMatched'] == 2
+
+
+def test_match_satellite_to_insitu(test_dir):
+    """
+    Test the test_match_satellite_to_insitu and ensure the matchup is
+    done as expected, where the tile points and in-situ points are all
+    known and the expected matchup points have been hand-calculated.
+
+    This test case mocks out all external dependencies, so Solr,
+    Cassandra, HTTP insitu requests, etc are all mocked.
+
+    The test points are as follows:
+
+    X (0, 20)                         X (20, 20)
+
+            O (5, 15)
+
+
+
+                     O (10, 10)
+
+
+
+
+                             O (18, 3)
+
+    X (0, 0)                        X (20, 0)
+
+    The 'X' points are the primary satellite points and the 'O' points
+    are the secondary satellite or insitu points
+
+    Visual inspection reveals that primary point (0, 20) should match
+    with secondary point (5, 15) and primary point (20, 0) should match
+    with (18, 3)
+    """
+    tile = Tile()
+    tile.tile_id = 1
+    tile.tile_min_lat = 0
+    tile.tile_max_lat = 20
+    tile.tile_min_lon = 0
+    tile.tile_max_lon = 20
+    tile.dataset = 'test-dataset'
+    tile.dataset_id = 123
+    tile.granule = 'test-granule-name'
+    tile.min_time = '2020-07-28T00:00:00'
+    tile.max_time = '2020-07-28T00:00:00'
+    tile.section_spec = 'test-section-spec'
+    tile.var_name = 'sea_surface_temperature'
+    tile.latitudes = np.array([0, 20], dtype=np.float32)
+    tile.longitudes = np.array([0, 20], dtype=np.float32)
+    tile.times = [1627490285]
+    tile.data = np.array([[[11.0, 21.0], [31.0, 41.0]]])
+    tile.get_indices = lambda: [[0, 0, 0], [0, 0, 1], [0, 1, 0], [0, 1, 1]]
+    tile.meta_data = {'wind_type': []}
+
+    tile_service_factory = mock.MagicMock()
+    tile_service = mock.MagicMock()
+    tile_service_factory.return_value = tile_service
+    tile_service.get_bounding_box.return_value = box(-90, -45, 90, 45)
+    tile_service.get_min_time.return_value = 1627490285
+    tile_service.get_max_time.return_value = 1627490285
+    tile_service.mask_tiles_to_polygon.return_value = [tile]
+
+    tile_ids = [1]
+    polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 
31.00, -34.98 29.54))'
+    primary_ds_name = 'primary-ds-name'
+    matchup_ds_names = 'test'
+    parameter = 'sst'
+    depth_min = 0.0
+    depth_max = 1.0
+    time_tolerance = 3.0
+    radius_tolerance = 1000000.0
+    platforms = '1,2,3,4,5,6,7,8,9'
+
+    class MockSparkParam:
+        def __init__(self, value):
+            self.value = value
+
+    with 
mock.patch('webservice.algorithms_spark.Matchup.edge_endpoints.getEndpointByName')
 as mock_edge_endpoints:
+        # Test the satellite->insitu branch
+        # By mocking the getEndpointsByName function we are forcing
+        # Matchup to think this dummy matchup dataset is an insitu
+        # dataset
+        mock_edge_endpoints.return_value = {'url': 'http://test-edge-url'}
+        matchup.query_edge = lambda *args, **kwargs: json.load(
+            open(os.path.join(test_dir, 'edge_response.json')))
+
+        match_args = dict(
+            tile_ids=tile_ids,
+            primary_b=MockSparkParam(primary_ds_name),
+            matchup_b=MockSparkParam(matchup_ds_names),
+            parameter_b=MockSparkParam(parameter),
+            tt_b=MockSparkParam(time_tolerance),
+            rt_b=MockSparkParam(radius_tolerance),
+            platforms_b=MockSparkParam(platforms),
+            bounding_wkt_b=MockSparkParam(polygon_wkt),
+            depth_min_b=MockSparkParam(depth_min),
+            depth_max_b=MockSparkParam(depth_max),
+            tile_service_factory=tile_service_factory
+        )
+
+        generator = matchup.match_satellite_to_insitu(**match_args)
+
+        def validate_matchup_result(matchup_result, insitu_matchup):
+            """
+            The matchup results for satellite->insitu vs
+            satellite->satellite are almost exactly the same so they
+            can be validated using the same logic. They are the same
+            because they represent the same data, except one test is in
+            tile format (sat to sat) and one is in edge point format
+            (insitu). The only difference is the data field is different
+            for satellite data.
+            """
+            # There should be two primary matchup points
+            assert len(matchup_result) == 2
+            # Each primary point matched with 1 matchup point
+            assert len(matchup_result[0]) == 2
+            assert len(matchup_result[1]) == 2
+            # Check that the satellite point was matched to the expected 
insitu point
+            assert matchup_result[0][1].latitude == 3.0
+            assert matchup_result[0][1].longitude == 18.0
+            assert matchup_result[1][1].latitude == 15.0
+            assert matchup_result[1][1].longitude == 5.0
+            # Check that the insitu points have the expected values
+            if insitu_matchup:
+                assert matchup_result[0][1].sst == 30.0
+                assert matchup_result[1][1].sst == 10.0
+            else:
+                assert matchup_result[0][1].satellite_var_value == 30.0
+                assert matchup_result[1][1].satellite_var_value == 10.0
+            # Check that the satellite points have the expected values
+            assert matchup_result[0][0].satellite_var_value == 21.0
+            assert matchup_result[1][0].satellite_var_value == 31.0
+
+        insitu_matchup_result = list(generator)
+        validate_matchup_result(insitu_matchup_result, insitu_matchup=True)
+
+        # Test the satellite->satellite branch
+        # By mocking the getEndpointsByName function to return None we
+        # are forcing Matchup to think this dummy matchup dataset is
+        # satellite dataset
+        mock_edge_endpoints.return_value = None
+
+        # Open the edge response json. We want to convert these points
+        # to tile points so we can test sat to sat matchup
+        edge_json = json.load(open(os.path.join(test_dir, 
'edge_response.json')))
+        points = [wkt.loads(result['point']) for result in 
edge_json['results']]
+
+        matchup_tile = Tile()
+        matchup_tile.tile_id = 1
+        matchup_tile.tile_min_lat = 3
+        matchup_tile.tile_max_lat = 15
+        matchup_tile.tile_min_lon = 5
+        matchup_tile.tile_max_lon = 18
+        matchup_tile.dataset = 'test-dataset'
+        matchup_tile.dataset_id = 123
+        matchup_tile.granule = 'test-granule-name'
+        matchup_tile.min_time = '2020-07-28T00:00:00'
+        matchup_tile.max_time = '2020-07-28T00:00:00'
+        matchup_tile.section_spec = 'test-section-spec'
+        matchup_tile.var_name = 'sea_surface_temperature'
+        matchup_tile.latitudes = np.array([point.y for point in points], 
dtype=np.float32)
+        matchup_tile.longitudes = np.array([point.x for point in points], 
dtype=np.float32)
+        matchup_tile.times = [edge_json['results'][0]['time']]
+        matchup_tile.data = np.array([[[10.0, 0, 0], [0, 20.0, 0], [0, 0, 
30.0]]])
+        matchup_tile.get_indices = lambda: [[0, 0, 0], [0, 1, 1], [0, 2, 2]]
+        matchup_tile.meta_data = {'wind_type': []}
+
+        tile_service.find_tiles_in_polygon.return_value = [matchup_tile]
+
+        generator = matchup.match_satellite_to_insitu(**match_args)
+
+        sat_matchup_result = list(generator)
+        validate_matchup_result(sat_matchup_result, insitu_matchup=False)
diff --git a/analysis/tests/conftest.py b/analysis/tests/conftest.py
new file mode 100644
index 0000000..43af49c
--- /dev/null
+++ b/analysis/tests/conftest.py
@@ -0,0 +1,34 @@
+import logging
+import pytest
+import os
+
+from pyspark import HiveContext
+from pyspark import SparkConf
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+
+def quiet_py4j():
+    logger = logging.getLogger('py4j')
+    logger.setLevel(logging.WARN)
+
[email protected](scope="session")
+def spark_context(request):
+    conf = 
(SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
+    request.addfinalizer(lambda: sc.stop())
+
+    sc = SparkContext(conf=conf)
+    quiet_py4j()
+    return sc
+
[email protected](scope="session")
+def hive_context(spark_context):
+    return HiveContext(spark_context)
+
[email protected](scope="session")
+def streaming_context(spark_context):
+    return StreamingContext(spark_context, 1)
+
[email protected](scope="session")
+def setup_pyspark_env():
+    os.environ['PYSPARK_DRIVER_PYTHON'] = 
'/usr/local/anaconda3/envs/nexus-messages3/bin/python'
+    os.environ['PYSPARK_PYTHON'] = 
'/usr/local/anaconda3/envs/nexus-messages3/bin/python'
\ No newline at end of file
diff --git a/analysis/tests/data/edge_response.json 
b/analysis/tests/data/edge_response.json
new file mode 100644
index 0000000..99be82d
--- /dev/null
+++ b/analysis/tests/data/edge_response.json
@@ -0,0 +1,24 @@
+{
+  "totalResults": 3,
+  "results": [
+    {
+      "point": "Point(5.0 15.0)",
+      "time": 1595954285,
+      "sea_water_temperature": 10.0,
+      "id": 1234
+    },
+    {
+      "point": "Point(10.0 10.0)",
+      "time": 1595954286,
+      "sea_water_temperature": 20.0,
+      "id": 1235
+    },
+    {
+      "point": "Point(18.0 3.0)",
+      "time": 1595954287,
+      "sea_water_temperature": 30.0,
+      "id": 1236
+    }
+  ]
+}
+
diff --git a/analysis/tests/integration/algorithms_spark/test_matchup.py 
b/analysis/tests/integration/algorithms_spark/test_matchup.py
new file mode 100644
index 0000000..61bd04b
--- /dev/null
+++ b/analysis/tests/integration/algorithms_spark/test_matchup.py
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+import pytest
+from shapely import wkt
+from webservice.algorithms_spark.Matchup import *
+
+
[email protected]
+class TestMatchup(unittest.TestCase):
+    def setUp(self):
+        from os import environ
+        environ['PYSPARK_DRIVER_PYTHON'] = 
'/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
+        environ['PYSPARK_PYTHON'] = 
'/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
+        environ['SPARK_HOME'] = 
'/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7'
+
+    def test_mur_match(self):
+        from shapely.wkt import loads
+        from nexustiles.nexustiles import NexusTileService
+
+        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, 
-34.98 31.00, -34.98 29.54))")
+        primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1"
+        matchup_ds = "spurs"
+        parameter = "sst"
+        start_time = 1350259200  # 2012-10-15T00:00:00Z
+        end_time = 1350345600  # 2012-10-16T00:00:00Z
+        time_tolerance = 86400
+        depth_tolerance = 5.0
+        radius_tolerance = 1500.0
+        platforms = "1,2,3,4,5,6,7,8,9"
+
+        tile_service = NexusTileService()
+        tile_ids = [tile.tile_id for tile in
+                    tile_service.find_tiles_in_polygon(polygon, primary_ds, 
start_time, end_time, fetch_data=False,
+                                                       fl='id')]
+        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), 
primary_ds, matchup_ds, parameter, time_tolerance,
+                                      depth_tolerance, radius_tolerance, 
platforms)
+        for k, v in result.items():
+            print("primary: %s\n\tmatches:\n\t\t%s" % (
+                "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, 
k.latitude, k.time, k.sst),
+                '\n\t\t'.join(
+                    ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, 
i.latitude, i.time, i.sst) for i in v])))
+
+    def test_smap_match(self):
+        from shapely.wkt import loads
+        from nexustiles.nexustiles import NexusTileService
+
+        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, 
-34.98 31.00, -34.98 29.54))")
+        primary_ds = "SMAP_L2B_SSS"
+        matchup_ds = "spurs"
+        parameter = "sss"
+        start_time = 1350259200  # 2012-10-15T00:00:00Z
+        end_time = 1350345600  # 2012-10-16T00:00:00Z
+        time_tolerance = 86400
+        depth_tolerance = 5.0
+        radius_tolerance = 1500.0
+        platforms = "1,2,3,4,5,6,7,8,9"
+
+        tile_service = NexusTileService()
+        tile_ids = [tile.tile_id for tile in
+                    tile_service.find_tiles_in_polygon(polygon, primary_ds, 
start_time, end_time, fetch_data=False,
+                                                       fl='id')]
+        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), 
primary_ds, matchup_ds, parameter, time_tolerance,
+                                      depth_tolerance, radius_tolerance, 
platforms)
+        for k, v in result.items():
+            print("primary: %s\n\tmatches:\n\t\t%s" % (
+                "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, 
k.latitude, k.time, k.sst),
+                '\n\t\t'.join(
+                    ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, 
i.latitude, i.time, i.sst) for i in v])))
+
+    def test_ascatb_match(self):
+        from shapely.wkt import loads
+        from nexustiles.nexustiles import NexusTileService
+
+        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, 
-34.98 31.00, -34.98 29.54))")
+        primary_ds = "ASCATB-L2-Coastal"
+        matchup_ds = "spurs"
+        parameter = "wind"
+        start_time = 1351468800  # 2012-10-29T00:00:00Z
+        end_time = 1351555200  # 2012-10-30T00:00:00Z
+        time_tolerance = 86400
+        depth_tolerance = 5.0
+        radius_tolerance = 110000.0  # 110 km
+        platforms = "1,2,3,4,5,6,7,8,9"
+
+        tile_service = NexusTileService()
+        tile_ids = [tile.tile_id for tile in
+                    tile_service.find_tiles_in_polygon(polygon, primary_ds, 
start_time, end_time, fetch_data=False,
+                                                       fl='id')]
+        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), 
primary_ds, matchup_ds, parameter, time_tolerance,
+                                      depth_tolerance, radius_tolerance, 
platforms)
+        for k, v in result.items():
+            print("primary: %s\n\tmatches:\n\t\t%s" % (
+                "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude, 
k.latitude, k.time, k.wind_u, k.wind_v),
+                '\n\t\t'.join(
+                    ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (
+                        i.longitude, i.latitude, i.time, i.wind_u, i.wind_v) 
for i in v])))
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py 
b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
index c91c883..dbfc692 100644
--- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
+++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
@@ -74,6 +74,8 @@ class DomsEncoder(json.JSONEncoder):
             return int((obj - EPOCH).total_seconds())
         elif isinstance(obj, Decimal):
             return str(obj)
+        elif isinstance(obj, np.float32):
+            return float(obj)
         else:
             return json.JSONEncoder.default(self, obj)
 
diff --git a/analysis/webservice/algorithms_spark/Matchup.py 
b/analysis/webservice/algorithms_spark/Matchup.py
index a5277b3..f6275f4 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -18,6 +18,7 @@
 import json
 import logging
 import threading
+from shapely.geometry import Polygon
 from datetime import datetime
 from itertools import chain
 from math import cos, radians
@@ -434,6 +435,10 @@ class DomsPoint(object):
         except KeyError:
             point.data_id = "%s:%s:%s" % (point.time, point.longitude, 
point.latitude)
 
+        if 'var_name' in edge_point and 'var_value' in edge_point:
+            point.satellite_var_name = edge_point['var_name']
+            point.satellite_var_value = edge_point['var_value']
+
         return point
 
 
@@ -463,11 +468,25 @@ def spark_matchup_driver(tile_ids, bounding_wkt, 
primary_ds_name, matchup_ds_nam
 
     # Map Partitions ( list(tile_id) )
     rdd_filtered = rdd.mapPartitions(
-        partial(match_satellite_to_insitu, primary_b=primary_b, 
matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
-                rt_b=rt_b, platforms_b=platforms_b, 
bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
-                depth_max_b=depth_max_b, 
tile_service_factory=tile_service_factory), preservesPartitioning=True) \
-        .filter(lambda p_m_tuple: abs(
-        iso_time_to_epoch(p_m_tuple[0].time) - 
iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
+        partial(
+            match_satellite_to_insitu,
+            primary_b=primary_b,
+            matchup_b=matchup_b,
+            parameter_b=parameter_b,
+            tt_b=tt_b,
+            rt_b=rt_b,
+            platforms_b=platforms_b,
+            bounding_wkt_b=bounding_wkt_b,
+            depth_min_b=depth_min_b,
+            depth_max_b=depth_max_b,
+            tile_service_factory=tile_service_factory
+        ),
+        preservesPartitioning=True
+    ).filter(
+        lambda p_m_tuple: abs(
+            iso_time_to_epoch(p_m_tuple[0].time) - 
iso_time_to_epoch(p_m_tuple[1].time)
+        ) <= time_tolerance
+    )
 
     if match_once:
         # Only the 'nearest' point for each primary should be returned. Add an 
extra map/reduce which calculates
@@ -523,6 +542,25 @@ def add_meters_to_lon_lat(lon, lat, meters):
     return longitude, latitude
 
 
+def tile_to_edge_points(tile):
+    indices = tile.get_indices()
+    edge_points = []
+
+    for idx in indices:
+        edge_point = {
+            'point': f'Point({tile.longitudes[idx[2]]} 
{tile.latitudes[idx[1]]})',
+            'time': 
datetime.utcfromtimestamp(tile.times[idx[0]]).strftime('%Y-%m-%dT%H:%M:%SZ'),
+            'source': tile.dataset,
+            'platform': None,
+            'device': None,
+            'fileurl': tile.granule,
+            'var_name': tile.var_name,
+            'var_value': tile.data[tuple(idx)]
+        }
+        edge_points.append(edge_point)
+    return edge_points
+
+
 def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, 
tt_b, rt_b, platforms_b,
                               bounding_wkt_b, depth_min_b, depth_max_b, 
tile_service_factory):
     the_time = datetime.now()
@@ -560,38 +598,77 @@ def match_satellite_to_insitu(tile_ids, primary_b, 
matchup_b, parameter_b, tt_b,
         str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
 
     # Query edge for all points within the spatial-temporal extents of this 
partition
-    the_time = datetime.now()
-    edge_session = requests.Session()
-    edge_results = []
-    with edge_session:
-        for insitudata_name in matchup_b.value.split(','):
-            bbox = ','.join(
-                [str(matchup_min_lon), str(matchup_min_lat), 
str(matchup_max_lon), str(matchup_max_lat)])
-            edge_response = query_edge(insitudata_name, parameter_b.value, 
matchup_min_time, matchup_max_time, bbox,
-                                       platforms_b.value, depth_min_b.value, 
depth_max_b.value, session=edge_session)
-            if edge_response['totalResults'] == 0:
-                continue
-            r = edge_response['results']
-            for p in r:
-                p['source'] = insitudata_name
-            edge_results.extend(r)
-    print("%s Time to call edge for partition %s to %s" % (str(datetime.now() 
- the_time), tile_ids[0], tile_ids[-1]))
-    if len(edge_results) == 0:
-        return []
+    is_insitu_dataset = edge_endpoints.getEndpointByName(matchup_b.value)
 
-    # Convert edge points to utm
-    the_time = datetime.now()
-    matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
-    for n, edge_point in enumerate(edge_results):
-        try:
-            x, y = wkt.loads(edge_point['point']).coords[0]
-        except WKTReadingError:
+    if is_insitu_dataset:
+        the_time = datetime.now()
+        edge_session = requests.Session()
+        edge_results = []
+        with edge_session:
+            for insitudata_name in matchup_b.value.split(','):
+                bbox = ','.join(
+                    [str(matchup_min_lon), str(matchup_min_lat), 
str(matchup_max_lon), str(matchup_max_lat)])
+                edge_response = query_edge(insitudata_name, parameter_b.value, 
matchup_min_time, matchup_max_time, bbox,
+                                           platforms_b.value, 
depth_min_b.value, depth_max_b.value, session=edge_session)
+                if edge_response['totalResults'] == 0:
+                    continue
+                r = edge_response['results']
+                for p in r:
+                    p['source'] = insitudata_name
+                edge_results.extend(r)
+        print("%s Time to call edge for partition %s to %s" % 
(str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
+        if len(edge_results) == 0:
+            return []
+
+        # Convert edge points to utm
+        the_time = datetime.now()
+        matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
+        for n, edge_point in enumerate(edge_results):
             try:
-                x, y = Point(*[float(c) for c in edge_point['point'].split(' 
')]).coords[0]
-            except ValueError:
-                y, x = Point(*[float(c) for c in 
edge_point['point'].split(',')]).coords[0]
+                x, y = wkt.loads(edge_point['point']).coords[0]
+            except WKTReadingError:
+                try:
+                    x, y = Point(*[float(c) for c in 
edge_point['point'].split(' ')]).coords[0]
+                except ValueError:
+                    y, x = Point(*[float(c) for c in 
edge_point['point'].split(',')]).coords[0]
+
+            matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
+    else:
+        # Query nexus (cassandra? solr?) to find matching points.
+        bbox = ','.join(
+            [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon),
+             str(matchup_max_lat)])
+        west, south, east, north = [float(b) for b in bbox.split(",")]
+        polygon = Polygon(
+            [(west, south), (east, south), (east, north), (west, north), 
(west, south)])
+
+        matchup_tiles = tile_service.find_tiles_in_polygon(
+            bounding_polygon=polygon,
+            ds=matchup_b.value,
+            start_time=matchup_min_time,
+            end_time=matchup_max_time,
+            fetch_data=True,
+            sort=['tile_min_time_dt asc', 'tile_min_lon asc', 'tile_min_lat 
asc'],
+            rows=5000
+        )
+
+        # Convert Tile IDS to tiles and convert to UTM lat/lon projection.
+        matchup_points = []
+        for tile in matchup_tiles:
+            valid_indices = tile.get_indices()
+            primary_points = np.array([aeqd_proj(
+                tile.longitudes[aslice[2]],
+                tile.latitudes[aslice[1]]
+            ) for aslice in valid_indices])
+            matchup_points.extend(primary_points)
+
+        # Convert tiles to 'edge points' which match the format of in-situ 
edge points.
+        edge_results = []
+        for matchup_tile in matchup_tiles:
+            edge_results.extend(tile_to_edge_points(matchup_tile))
+
+        matchup_points = np.array(matchup_points)
 
-        matchup_points[n][0], matchup_points[n][1] = aeqd_proj(x, y)
     print("%s Time to convert match points for partition %s to %s" % (
         str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]))
 

Reply via email to