This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch SDAP-268 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit c905c373c33b71d116d26465254e0012658f114d Author: Eamon Ford <[email protected]> AuthorDate: Wed Aug 5 15:27:27 2020 -0700 revert --- analysis/tests/algorithms_spark/Matchup_test.py | 321 +++++++++++ analysis/tests/algorithms_spark/__init__.py | 16 + analysis/webservice/algorithms_spark/Matchup.py | 703 ++++++++++++++++++++++++ 3 files changed, 1040 insertions(+) diff --git a/analysis/tests/algorithms_spark/Matchup_test.py b/analysis/tests/algorithms_spark/Matchup_test.py new file mode 100644 index 0000000..5dee17c --- /dev/null +++ b/analysis/tests/algorithms_spark/Matchup_test.py @@ -0,0 +1,321 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pickle +import random +import timeit +import unittest + +from webservice.algorithms_spark.Matchup import * + + +class TestMatch_Points(unittest.TestCase): + def test_one_point_match_exact(self): + primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) + matchup = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=2) + + primary_points = [primary] + matchup_points = [matchup] + + matches = list(match_points_generator(primary_points, matchup_points, 0)) + + self.assertEquals(1, len(matches)) + + p_match_point, match = matches[0] + + self.assertEqual(primary, p_match_point) + self.assertEqual(matchup, match) + + def test_one_point_match_within_tolerance_150km(self): + primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) + matchup = DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=5.0, data_id=2) + + primary_points = [primary] + matchup_points = [matchup] + + matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km + + self.assertEquals(1, len(matches)) + + p_match_point, match = matches[0] + + self.assertEqual(primary, p_match_point) + self.assertEqual(matchup, match) + + def test_one_point_match_within_tolerance_200m(self): + primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) + matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2) + + primary_points = [primary] + matchup_points = [matchup] + + matches = list(match_points_generator(primary_points, matchup_points, 200)) # tolerance 200 m + + self.assertEquals(1, len(matches)) + + p_match_point, match = matches[0] + + self.assertEqual(primary, p_match_point) + self.assertEqual(matchup, match) + + def test_one_point_not_match_tolerance_150km(self): + primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) + matchup = DomsPoint(longitude=1.0, latitude=4.0, time=1000, depth=5.0, data_id=2) + + primary_points = [primary] + matchup_points = [matchup] + + matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km + + self.assertEquals(0, len(matches)) + + def test_one_point_not_match_tolerance_100m(self): + primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) + matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2) + + primary_points = [primary] + matchup_points = [matchup] + + matches = list(match_points_generator(primary_points, matchup_points, 100)) # tolerance 100 m + + self.assertEquals(0, len(matches)) + + def test_multiple_point_match(self): + primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) + primary_points = [primary] + + matchup_points = [ + DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=2), + DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=3), + DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=4) + ] + + matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km + + self.assertEquals(3, len(matches)) + + self.assertSetEqual({primary}, {x[0] for x in matches}) + + list_of_matches = [x[1] for x in matches] + + self.assertEquals(3, len(list_of_matches)) + self.assertItemsEqual(matchup_points, list_of_matches) + + def test_multiple_point_match_multiple_times(self): + primary_points = [ + DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1), + DomsPoint(longitude=1.5, latitude=1.5, time=1000, depth=5.0, data_id=2) + ] + + matchup_points = [ + DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=3), + DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=4), + DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=5) + ] + + matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km + + self.assertEquals(5, len(matches)) + + self.assertSetEqual({p for p in primary_points}, {x[0] for x in matches}) + + # First primary point matches all 3 secondary + self.assertEquals(3, [x[0] for x in matches].count(primary_points[0])) + self.assertItemsEqual(matchup_points, [x[1] for x in matches if x[0] == primary_points[0]]) + + # Second primary point matches only last 2 secondary + self.assertEquals(2, [x[0] for x in matches].count(primary_points[1])) + self.assertItemsEqual(matchup_points[1:], [x[1] for x in matches if x[0] == primary_points[1]]) + + def test_one_of_many_primary_matches_one_of_many_matchup(self): + primary_points = [ + DomsPoint(longitude=-33.76764, latitude=30.42946, time=1351553994, data_id=1), + DomsPoint(longitude=-33.75731, latitude=29.86216, time=1351554004, data_id=2) + ] + + matchup_points = [ + DomsPoint(longitude=-33.762, latitude=28.877, time=1351521432, depth=3.973, data_id=3), + DomsPoint(longitude=-34.916, latitude=28.879, time=1351521770, depth=2.9798, data_id=4), + DomsPoint(longitude=-31.121, latitude=31.256, time=1351519892, depth=4.07, data_id=5) + ] + + matches = list(match_points_generator(primary_points, matchup_points, 110000)) # tolerance 110 km + + self.assertEquals(1, len(matches)) + + self.assertSetEqual({p for p in primary_points if p.data_id == 2}, {x[0] for x in matches}) + + # First primary point matches none + self.assertEquals(0, [x[0] for x in matches].count(primary_points[0])) + + # Second primary point matches only first secondary + self.assertEquals(1, [x[0] for x in matches].count(primary_points[1])) + self.assertItemsEqual(matchup_points[0:1], [x[1] for x in matches if x[0] == primary_points[1]]) + + @unittest.skip("This test is just for timing, doesn't actually assert anything.") + def test_time_many_primary_many_matchup(self): + import logging + import sys + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout) + log = logging.getLogger(__name__) + # Generate 160000 DomsPoints distributed equally in a box from -2.0 lat/lon to 2.0 lat/lon + log.info("Generating primary points") + x = np.arange(-2.0, 2.0, 0.01) + y = np.arange(-2.0, 2.0, 0.01) + primary_points = [DomsPoint(longitude=xy[0], latitude=xy[1], time=1000, depth=5.0, data_id=i) for i, xy in + enumerate(np.array(np.meshgrid(x, y)).T.reshape(-1, 2))] + + # Generate 2000 DomsPoints distributed randomly in a box from -2.0 lat/lon to 2.0 lat/lon + log.info("Generating matchup points") + matchup_points = [ + DomsPoint(longitude=random.uniform(-2.0, 2.0), latitude=random.uniform(-2.0, 2.0), time=1000, depth=5.0, + data_id=i) for i in xrange(0, 2000)] + + log.info("Starting matchup") + log.info("Best of repeat(3, 2) matchups: %s seconds" % min( + timeit.repeat(lambda: list(match_points_generator(primary_points, matchup_points, 1500)), repeat=3, + number=2))) + + +class TestDOMSPoint(unittest.TestCase): + def test_is_pickleable(self): + edge_point = json.loads("""{ +"id": "argo-profiles-5903995(46, 0)", +"time": "2012-10-15T14:24:04Z", +"point": "-33.467 29.728", +"sea_water_temperature": 24.5629997253, +"sea_water_temperature_depth": 2.9796258642, +"wind_speed": null, +"sea_water_salinity": null, +"sea_water_salinity_depth": null, +"platform": 4, +"device": 3, +"fileurl": "ftp://podaac-ftp.jpl.nasa.gov/allData/insitu/L2/spurs1/argo/argo-profiles-5903995.nc" +}""") + point = DomsPoint.from_edge_point(edge_point) + self.assertIsNotNone(pickle.dumps(point)) + + +def check_all(): + return check_solr() and check_cass() and check_edge() + + +def check_solr(): + # TODO eventually this might do something. + return False + + +def check_cass(): + # TODO eventually this might do something. + return False + + +def check_edge(): + # TODO eventually this might do something. + return False + + [email protected](check_all(), + "These tests require local instances of Solr, Cassandra, and Edge to be running.") +class TestMatchup(unittest.TestCase): + def setUp(self): + from os import environ + environ['PYSPARK_DRIVER_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7' + environ['PYSPARK_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7' + environ['SPARK_HOME'] = '/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7' + + def test_mur_match(self): + from shapely.wkt import loads + from nexustiles.nexustiles import NexusTileService + + polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))") + primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" + matchup_ds = "spurs" + parameter = "sst" + start_time = 1350259200 # 2012-10-15T00:00:00Z + end_time = 1350345600 # 2012-10-16T00:00:00Z + time_tolerance = 86400 + depth_tolerance = 5.0 + radius_tolerance = 1500.0 + platforms = "1,2,3,4,5,6,7,8,9" + + tile_service = NexusTileService() + tile_ids = [tile.tile_id for tile in + tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False, + fl='id')] + result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance, + depth_tolerance, radius_tolerance, platforms) + for k, v in result.iteritems(): + print "primary: %s\n\tmatches:\n\t\t%s" % ( + "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst), + '\n\t\t'.join( + ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v])) + + def test_smap_match(self): + from shapely.wkt import loads + from nexustiles.nexustiles import NexusTileService + + polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))") + primary_ds = "SMAP_L2B_SSS" + matchup_ds = "spurs" + parameter = "sss" + start_time = 1350259200 # 2012-10-15T00:00:00Z + end_time = 1350345600 # 2012-10-16T00:00:00Z + time_tolerance = 86400 + depth_tolerance = 5.0 + radius_tolerance = 1500.0 + platforms = "1,2,3,4,5,6,7,8,9" + + tile_service = NexusTileService() + tile_ids = [tile.tile_id for tile in + tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False, + fl='id')] + result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance, + depth_tolerance, radius_tolerance, platforms) + for k, v in result.iteritems(): + print "primary: %s\n\tmatches:\n\t\t%s" % ( + "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst), + '\n\t\t'.join( + ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v])) + + def test_ascatb_match(self): + from shapely.wkt import loads + from nexustiles.nexustiles import NexusTileService + + polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))") + primary_ds = "ASCATB-L2-Coastal" + matchup_ds = "spurs" + parameter = "wind" + start_time = 1351468800 # 2012-10-29T00:00:00Z + end_time = 1351555200 # 2012-10-30T00:00:00Z + time_tolerance = 86400 + depth_tolerance = 5.0 + radius_tolerance = 110000.0 # 110 km + platforms = "1,2,3,4,5,6,7,8,9" + + tile_service = NexusTileService() + tile_ids = [tile.tile_id for tile in + tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False, + fl='id')] + result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance, + depth_tolerance, radius_tolerance, platforms) + for k, v in result.iteritems(): + print "primary: %s\n\tmatches:\n\t\t%s" % ( + "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude, k.latitude, k.time, k.wind_u, k.wind_v), + '\n\t\t'.join( + ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % ( + i.longitude, i.latitude, i.time, i.wind_u, i.wind_v) for i in v])) diff --git a/analysis/tests/algorithms_spark/__init__.py b/analysis/tests/algorithms_spark/__init__.py new file mode 100644 index 0000000..0707368 --- /dev/null +++ b/analysis/tests/algorithms_spark/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py new file mode 100644 index 0000000..9ae7557 --- /dev/null +++ b/analysis/webservice/algorithms_spark/Matchup.py @@ -0,0 +1,703 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +import json +import logging +import threading +from datetime import datetime +from itertools import chain +from math import cos, radians + +import numpy as np +import pyproj +import requests +from nexustiles.nexustiles import NexusTileService +from pytz import timezone, UTC +from scipy import spatial +from shapely import wkt +from shapely.geometry import Point +from shapely.geometry import box +from shapely.geos import WKTReadingError + +from webservice.NexusHandler import nexus_handler +from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler +from webservice.algorithms.doms import config as edge_endpoints +from webservice.algorithms.doms import values as doms_values +from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults +from webservice.algorithms.doms.ResultsStorage import ResultsStorage +from webservice.webmodel import NexusProcessingException + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +def iso_time_to_epoch(str_time): + return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=UTC) - EPOCH).total_seconds() + + +@nexus_handler +class Matchup(NexusCalcSparkHandler): + name = "Matchup" + path = "/match_spark" + description = "Match measurements between two or more datasets" + + params = { + "primary": { + "name": "Primary Dataset", + "type": "string", + "description": "The Primary dataset used to find matches for. Required" + }, + "matchup": { + "name": "Match-Up Datasets", + "type": "comma-delimited string", + "description": "The Dataset(s) being searched for measurements that match the Primary. Required" + }, + "parameter": { + "name": "Match-Up Parameter", + "type": "string", + "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "b": { + "name": "Bounding box", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" + }, + "depthMin": { + "name": "Minimum Depth", + "type": "float", + "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit" + }, + "depthMax": { + "name": "Maximum Depth", + "type": "float", + "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit" + }, + "tt": { + "name": "Time Tolerance", + "type": "long", + "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400" + }, + "rt": { + "name": "Radius Tolerance", + "type": "float", + "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000" + }, + "platforms": { + "name": "Platforms", + "type": "comma-delimited integer", + "description": "Platforms to include for matchup consideration. Required" + }, + "matchOnce": { + "name": "Match Once", + "type": "boolean", + "description": "Optional True/False flag used to determine if more than one match per primary point is returned. " + + "If true, only the nearest point will be returned for each primary point. " + + "If false, all points within the tolerances will be returned for each primary point. Default: False" + }, + "resultSizeLimit": { + "name": "Result Size Limit", + "type": "int", + "description": "Optional integer value that limits the number of results returned from the matchup. " + "If the number of primary matches is greater than this limit, the service will respond with " + "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. " + "Default: 500" + } + } + singleton = True + + def __init__(self, algorithm_config=None, sc=None): + NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, skipCassandra=True) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + try: + bounding_polygon = request.get_bounding_polygon() + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + primary_ds_name = request.get_argument('primary', None) + if primary_ds_name is None: + raise NexusProcessingException(reason="'primary' argument is required", code=400) + matchup_ds_names = request.get_argument('matchup', None) + if matchup_ds_names is None: + raise NexusProcessingException(reason="'matchup' argument is required", code=400) + + parameter_s = request.get_argument('parameter', 'sst') + if parameter_s not in ['sst', 'sss', 'wind']: + raise NexusProcessingException( + reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) + + try: + start_time = request.get_start_datetime() + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + depth_min = request.get_decimal_arg('depthMin', default=None) + depth_max = request.get_decimal_arg('depthMax', default=None) + + if depth_min is not None and depth_max is not None and depth_min >= depth_max: + raise NexusProcessingException( + reason="Depth Min should be less than Depth Max", code=400) + + time_tolerance = request.get_int_arg('tt', default=86400) + radius_tolerance = request.get_decimal_arg('rt', default=1000.0) + platforms = request.get_argument('platforms', None) + if platforms is None: + raise NexusProcessingException(reason="'platforms' argument is required", code=400) + try: + p_validation = platforms.split(',') + p_validation = [int(p) for p in p_validation] + del p_validation + except: + raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) + + match_once = request.get_boolean_arg("matchOnce", default=False) + + result_size_limit = request.get_int_arg("resultSizeLimit", default=500) + + start_seconds_from_epoch = long((start_time - EPOCH).total_seconds()) + end_seconds_from_epoch = long((end_time - EPOCH).total_seconds()) + + return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \ + start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ + depth_min, depth_max, time_tolerance, radius_tolerance, \ + platforms, match_once, result_size_limit + + def calc(self, request, **args): + start = datetime.utcnow() + # TODO Assuming Satellite primary + bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \ + start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ + depth_min, depth_max, time_tolerance, radius_tolerance, \ + platforms, match_once, result_size_limit = self.parse_arguments(request) + + with ResultsStorage() as resultsStorage: + + execution_id = str(resultsStorage.insertExecution(None, start, None, None)) + + self.log.debug("Querying for tiles in search domain") + # Get tile ids in box + tile_ids = [tile.tile_id for tile in + self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name, + start_seconds_from_epoch, end_seconds_from_epoch, + fetch_data=False, fl='id', + sort=['tile_min_time_dt asc', 'tile_min_lon asc', + 'tile_min_lat asc'], rows=5000)] + + # Call spark_matchup + self.log.debug("Calling Spark Driver") + try: + spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name, + matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance, + radius_tolerance, platforms, match_once, sc=self._sc) + except Exception as e: + self.log.exception(e) + raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500) + + end = datetime.utcnow() + + self.log.debug("Building and saving results") + args = { + "primary": primary_ds_name, + "matchup": matchup_ds_names, + "startTime": start_time, + "endTime": end_time, + "bbox": request.get_argument('b'), + "timeTolerance": time_tolerance, + "radiusTolerance": float(radius_tolerance), + "platforms": platforms, + "parameter": parameter_s + } + + if depth_min is not None: + args["depthMin"] = float(depth_min) + + if depth_max is not None: + args["depthMax"] = float(depth_max) + + total_keys = len(spark_result.keys()) + total_values = sum(len(v) for v in spark_result.itervalues()) + details = { + "timeToComplete": int((end - start).total_seconds()), + "numInSituRecords": 0, + "numInSituMatched": total_values, + "numGriddedChecked": 0, + "numGriddedMatched": total_keys + } + + matches = Matchup.convert_to_matches(spark_result) + + def do_result_insert(): + with ResultsStorage() as storage: + storage.insertResults(results=matches, params=args, stats=details, + startTime=start, completeTime=end, userEmail="", + execution_id=execution_id) + + threading.Thread(target=do_result_insert).start() + + if 0 < result_size_limit < len(matches): + result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None, + computeOptions=None, executionId=execution_id, status_code=202) + else: + result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None, + computeOptions=None, executionId=execution_id) + + return result + + @classmethod + def convert_to_matches(cls, spark_result): + matches = [] + for primary_domspoint, matched_domspoints in spark_result.iteritems(): + p_matched = [cls.domspoint_to_dict(p_match) for p_match in matched_domspoints] + + primary = cls.domspoint_to_dict(primary_domspoint) + primary['matches'] = list(p_matched) + matches.append(primary) + return matches + + @staticmethod + def domspoint_to_dict(domspoint): + return { + "sea_water_temperature": domspoint.sst, + "sea_water_temperature_depth": domspoint.sst_depth, + "sea_water_salinity": domspoint.sss, + "sea_water_salinity_depth": domspoint.sss_depth, + "wind_speed": domspoint.wind_speed, + "wind_direction": domspoint.wind_direction, + "wind_u": domspoint.wind_u, + "wind_v": domspoint.wind_v, + "platform": doms_values.getPlatformById(domspoint.platform), + "device": doms_values.getDeviceById(domspoint.device), + "x": str(domspoint.longitude), + "y": str(domspoint.latitude), + "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude), + "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC), + "fileurl": domspoint.file_url, + "id": domspoint.data_id, + "source": domspoint.source, + } + + +class DomsPoint(object): + def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None): + + self.time = time + self.longitude = longitude + self.latitude = latitude + self.depth = depth + self.data_id = data_id + + self.wind_u = None + self.wind_v = None + self.wind_direction = None + self.wind_speed = None + self.sst = None + self.sst_depth = None + self.sss = None + self.sss_depth = None + self.source = None + self.depth = None + self.platform = None + self.device = None + self.file_url = None + + def __repr__(self): + return str(self.__dict__) + + @staticmethod + def from_nexus_point(nexus_point, tile=None, parameter='sst'): + point = DomsPoint() + + point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index) + + # TODO Not an ideal solution; but it works for now. + if parameter == 'sst': + point.sst = nexus_point.data_val.item() + elif parameter == 'sss': + point.sss = nexus_point.data_val.item() + elif parameter == 'wind': + point.wind_u = nexus_point.data_val.item() + try: + point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item() + except (KeyError, IndexError): + pass + try: + point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item() + except (KeyError, IndexError): + pass + try: + point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item() + except (KeyError, IndexError): + pass + else: + raise NotImplementedError('%s not supported. Only sst, sss, and wind parameters are supported.' % parameter) + + point.longitude = nexus_point.longitude.item() + point.latitude = nexus_point.latitude.item() + + point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ') + + try: + point.depth = nexus_point.depth + except KeyError: + # No depth associated with this measurement + pass + + point.sst_depth = 0 + point.source = tile.dataset + point.file_url = tile.granule + + # TODO device should change based on the satellite making the observations. + point.platform = 9 + point.device = 5 + return point + + @staticmethod + def from_edge_point(edge_point): + point = DomsPoint() + + try: + x, y = wkt.loads(edge_point['point']).coords[0] + except WKTReadingError: + try: + x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0] + except ValueError: + y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0] + + point.longitude = x + point.latitude = y + + point.time = edge_point['time'] + + point.wind_u = edge_point.get('eastward_wind') + point.wind_v = edge_point.get('northward_wind') + point.wind_direction = edge_point.get('wind_direction') + point.wind_speed = edge_point.get('wind_speed') + point.sst = edge_point.get('sea_water_temperature') + point.sst_depth = edge_point.get('sea_water_temperature_depth') + point.sss = edge_point.get('sea_water_salinity') + point.sss_depth = edge_point.get('sea_water_salinity_depth') + point.source = edge_point.get('source') + point.platform = edge_point.get('platform') + point.device = edge_point.get('device') + point.file_url = edge_point.get('fileurl') + + try: + point.data_id = unicode(edge_point['id']) + except KeyError: + point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude) + + return point + + +from threading import Lock + +DRIVER_LOCK = Lock() + + +def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max, + time_tolerance, radius_tolerance, platforms, match_once, sc=None): + from functools import partial + + with DRIVER_LOCK: + # Broadcast parameters + primary_b = sc.broadcast(primary_ds_name) + matchup_b = sc.broadcast(matchup_ds_names) + depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None) + depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None) + tt_b = sc.broadcast(time_tolerance) + rt_b = sc.broadcast(float(radius_tolerance)) + platforms_b = sc.broadcast(platforms) + bounding_wkt_b = sc.broadcast(bounding_wkt) + parameter_b = sc.broadcast(parameter) + + # Parallelize list of tile ids + rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids))) + + # Map Partitions ( list(tile_id) ) + rdd_filtered = rdd.mapPartitions( + partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b, + rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b, + depth_max_b=depth_max_b), preservesPartitioning=True) \ + .filter(lambda p_m_tuple: abs( + iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance) + + if match_once: + # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates + # the distance and finds the minimum + + # Method used for calculating the distance between 2 DomsPoints + from pyproj import Geod + + def dist(primary, matchup): + wgs84_geod = Geod(ellps='WGS84') + lat1, lon1 = (primary.latitude, primary.longitude) + lat2, lon2 = (matchup.latitude, matchup.longitude) + az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2) + return distance + + rdd_filtered = rdd_filtered \ + .map(lambda (primary, matchup): tuple([primary, tuple([matchup, dist(primary, matchup)])])) \ + .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \ + .mapValues(lambda x: [x[0]]) + else: + rdd_filtered = rdd_filtered \ + .combineByKey(lambda value: [value], # Create 1 element list + lambda value_list, value: value_list + [value], # Add 1 element to list + lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together + + result_as_map = rdd_filtered.collectAsMap() + + return result_as_map + + +def determine_parllelism(num_tiles): + """ + Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions. + Also, don't go below the default of 8 + """ + num_partitions = max(min(num_tiles / 140, 128), 8) + return num_partitions + + +def add_meters_to_lon_lat(lon, lat, meters): + """ + Uses a simple approximation of + 1 degree latitude = 111,111 meters + 1 degree longitude = 111,111 meters * cosine(latitude) + :param lon: longitude to add meters to + :param lat: latitude to add meters to + :param meters: meters to add to the longitude and latitude values + :return: (longitude, latitude) increased by given meters + """ + longitude = lon + ((meters / 111111) * cos(radians(lat))) + latitude = lat + (meters / 111111) + + return longitude, latitude + + +def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b, + bounding_wkt_b, depth_min_b, depth_max_b): + the_time = datetime.now() + tile_ids = list(tile_ids) + if len(tile_ids) == 0: + return [] + tile_service = NexusTileService() + + # Determine the spatial temporal extents of this partition of tiles + tiles_bbox = tile_service.get_bounding_box(tile_ids) + tiles_min_time = tile_service.get_min_time(tile_ids) + tiles_max_time = tile_service.get_max_time(tile_ids) + + # Increase spatial extents by the radius tolerance + matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1], + -1 * rt_b.value) + matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value) + + # Don't go outside of the search domain + search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds + matchup_min_lon = max(matchup_min_lon, search_min_x) + matchup_min_lat = max(matchup_min_lat, search_min_y) + matchup_max_lon = min(matchup_max_lon, search_max_x) + matchup_max_lat = min(matchup_max_lat, search_max_y) + + # Find the centroid of the matchup bounding box and initialize the projections + matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0] + aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1]) + lonlat_proj = pyproj.Proj(proj='lonlat') + + # Increase temporal extents by the time tolerance + matchup_min_time = tiles_min_time - tt_b.value + matchup_max_time = tiles_max_time + tt_b.value + print "%s Time to determine spatial-temporal extents for partition %s to %s" % ( + str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) + + # Query edge for all points within the spatial-temporal extents of this partition + the_time = datetime.now() + edge_session = requests.Session() + edge_results = [] + with edge_session: + for insitudata_name in matchup_b.value.split(','): + bbox = ','.join( + [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)]) + edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox, + platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session) + if edge_response['totalResults'] == 0: + continue + r = edge_response['results'] + for p in r: + p['source'] = insitudata_name + edge_results.extend(r) + print "%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) + if len(edge_results) == 0: + return [] + + # Convert edge points to utm + the_time = datetime.now() + matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32) + for n, edge_point in enumerate(edge_results): + try: + x, y = wkt.loads(edge_point['point']).coords[0] + except WKTReadingError: + try: + x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0] + except ValueError: + y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0] + + matchup_points[n][0], matchup_points[n][1] = pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=x, y=y) + print "%s Time to convert match points for partition %s to %s" % ( + str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) + + # Build kdtree from matchup points + the_time = datetime.now() + m_tree = spatial.cKDTree(matchup_points, leafsize=30) + print "%s Time to build matchup tree" % (str(datetime.now() - the_time)) + + # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time + match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value, + parameter_b.value, rt_b.value, lonlat_proj, aeqd_proj) for tile_id + in tile_ids] + + return chain(*match_generators) + + +def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt, + search_parameter, radius_tolerance, lonlat_proj, aeqd_proj): + from nexustiles.model.nexusmodel import NexusPoint + from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains + + # Load tile + try: + the_time = datetime.now() + tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt), + tile_service.find_tile_by_id(tile_id))[0] + print "%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id) + except IndexError: + # This should only happen if all measurements in a tile become masked after applying the bounding polygon + raise StopIteration + + # Convert valid tile lat,lon tuples to UTM tuples + the_time = datetime.now() + # Get list of indices of valid values + valid_indices = tile.get_indices() + primary_points = np.array( + [pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=tile.longitudes[aslice[2]], y=tile.latitudes[aslice[1]]) for + aslice in valid_indices]) + + print "%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id) + + a_time = datetime.now() + p_tree = spatial.cKDTree(primary_points, leafsize=30) + print "%s Time to build primary tree" % (str(datetime.now() - a_time)) + + a_time = datetime.now() + matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance) + print "%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id) + for i, point_matches in enumerate(matched_indexes): + if len(point_matches) > 0: + p_nexus_point = NexusPoint(tile.latitudes[valid_indices[i][1]], + tile.longitudes[valid_indices[i][2]], None, + tile.times[valid_indices[i][0]], valid_indices[i], + tile.data[tuple(valid_indices[i])]) + p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile, parameter=search_parameter) + for m_point_index in point_matches: + m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index]) + yield p_doms_point, m_doms_point + + +def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000, + startIndex=0, stats=True, session=None): + try: + startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + platform = platform.split(',') + except AttributeError: + # Assume we were passed a list + pass + + params = {"variable": variable, + "startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "minDepth": depth_min, + "maxDepth": depth_max, + "platform": platform, + "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()} + + if session is not None: + edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) + else: + edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) + + edge_request.raise_for_status() + edge_response = json.loads(edge_request.text) + + # Get all edge results + next_page_url = edge_response.get('next', None) + while next_page_url is not None: + if session is not None: + edge_page_request = session.get(next_page_url) + else: + edge_page_request = requests.get(next_page_url) + + edge_page_request.raise_for_status() + edge_page_response = json.loads(edge_page_request.text) + + edge_response['results'].extend(edge_page_response['results']) + + next_page_url = edge_page_response.get('next', None) + + return edge_response
