http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/fetchedgeimpl.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/fetchedgeimpl.py b/analysis/webservice/algorithms/doms/fetchedgeimpl.py new file mode 100644 index 0000000..d4a6c4a --- /dev/null +++ b/analysis/webservice/algorithms/doms/fetchedgeimpl.py @@ -0,0 +1,201 @@ +import json +import traceback +from datetime import datetime +from multiprocessing.pool import ThreadPool + +import requests + +import geo +import values +from webservice.webmodel import NexusProcessingException + + +def __parseDatetime(dtString): + dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") + epoch = datetime.utcfromtimestamp(0) + time = (dt - epoch).total_seconds() * 1000.0 + return time + + +def __parseLocation(locString): + if "Point" in locString: + locString = locString[6:-1] + + if "," in locString: + latitude = float(locString.split(",")[0]) + longitude = float(locString.split(",")[1]) + else: + latitude = float(locString.split(" ")[1]) + longitude = float(locString.split(" ")[0]) + + return (latitude, longitude) + + +def __resultRawToUsable(resultdict): + resultdict["time"] = __parseDatetime(resultdict["time"]) + latitude, longitude = __parseLocation(resultdict["point"]) + + resultdict["x"] = longitude + resultdict["y"] = latitude + + if "id" not in resultdict and "metadata" in resultdict: + resultdict["id"] = resultdict["metadata"] + + resultdict["id"] = "id-%s" % resultdict["id"] + + if "device" in resultdict: + resultdict["device"] = values.getDeviceById(resultdict["device"]) + + if "platform" in resultdict: + resultdict["platform"] = values.getPlatformById(resultdict["platform"]) + + if "mission" in resultdict: + resultdict["mission"] = values.getMissionById(resultdict["mission"]) + + if "sea_surface_temperature" in resultdict: + resultdict["sea_water_temperature"] = resultdict["sea_surface_temperature"] + del resultdict["sea_surface_temperature"] + + return resultdict + + +def __fetchJson(url, params, trycount=1, maxtries=5): + if trycount > maxtries: + raise Exception("Maximum retries attempted.") + if trycount > 1: + print "Retry #", trycount + r = requests.get(url, params=params, timeout=500.000) + + print r.url + + if r.status_code != 200: + return __fetchJson(url, params, trycount + 1, maxtries) + try: + results = json.loads(r.text) + return results + except: + return __fetchJson(url, params, trycount + 1, maxtries) + + +def __doQuery(endpoint, startTime, endTime, bbox, depth_min=None, depth_max=None, itemsPerPage=10, startIndex=0, platforms=None, + pageCallback=None): + params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "itemsPerPage": itemsPerPage, + "startIndex": startIndex, "stats": "true"} + + if depth_min is not None: + params['minDepth'] = depth_min + if depth_max is not None: + params['maxDepth'] = depth_max + + if platforms is not None: + params["platform"] = platforms.split(",") + + resultsRaw = __fetchJson(endpoint["url"], params) + boundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180) + + if resultsRaw["totalResults"] == 0 or len(resultsRaw["results"]) == 0: # Double-sanity check + return [], resultsRaw["totalResults"], startIndex, itemsPerPage, boundsConstrainer + + try: + results = [] + for resultdict in resultsRaw["results"]: + result = __resultRawToUsable(resultdict) + result["source"] = endpoint["name"] + boundsConstrainer.testCoords(north=result["y"], south=result["y"], west=result["x"], east=result["x"]) + results.append(result) + + if "stats_fields" in resultsRaw and len(resultsRaw["results"]) == 0: + stats = resultsRaw["stats_fields"] + if "lat" in stats and "lon" in stats: + boundsConstrainer.testCoords(north=stats['lat']['max'], south=stats['lat']['min'], + west=stats['lon']['min'], east=stats['lon']['max']) + + if pageCallback is not None: + pageCallback(results) + + ''' + If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it. + ''' + if pageCallback is None: + return results, int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int( + resultsRaw["itemsPerPage"]), boundsConstrainer + else: + return [], int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int( + resultsRaw["itemsPerPage"]), boundsConstrainer + except: + print "Invalid or missing JSON in response." + traceback.print_exc() + raise NexusProcessingException(reason="Invalid or missing JSON in response.") + # return [], 0, startIndex, itemsPerPage, boundsConstrainer + + +def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + startIndex = 0 + pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime, + endTime, bbox, + depth_min, depth_max, 0, + startIndex, platforms) + return totalResults, boundsConstrainer + + +def fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, pageCallback=None): + results = [] + startIndex = 0 + + mainBoundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180) + + # First isn't parellel so we can get the ttl results, forced items per page, etc... + pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime, + endTime, bbox, + depth_min, depth_max, + endpoint["itemsPerPage"], + startIndex, platforms, + pageCallback) + results = results + pageResults + mainBoundsConstrainer.testOtherConstrainer(boundsConstrainer) + + pool = ThreadPool(processes=endpoint["fetchThreads"]) + mpResults = [pool.apply_async(__doQuery, args=( + endpoint, startTime, endTime, bbox, depth_min, depth_max, itemsPerPageR, x, platforms, pageCallback)) for x in + range(len(pageResults), totalResults, itemsPerPageR)] + pool.close() + pool.join() + + ''' + If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it. + ''' + if pageCallback is None: + mpResults = [p.get() for p in mpResults] + for mpResult in mpResults: + results = results + mpResult[0] + mainBoundsConstrainer.testOtherConstrainer(mpResult[4]) + + return results, mainBoundsConstrainer + + +def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False): + results, boundsConstrainer = fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + + if placeholders: + trimmedResults = [] + for item in results: + depth = None + if "depth" in item: + depth = item["depth"] + if "sea_water_temperature_depth" in item: + depth = item["sea_water_temperature_depth"] + + trimmedItem = { + "x": item["x"], + "y": item["y"], + "source": item["source"], + "time": item["time"], + "device": item["device"] if "device" in item else None, + "platform": item["platform"], + "depth": depth + } + trimmedResults.append(trimmedItem) + + results = trimmedResults + + return results, boundsConstrainer
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/geo.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/geo.py b/analysis/webservice/algorithms/doms/geo.py new file mode 100644 index 0000000..ece1788 --- /dev/null +++ b/analysis/webservice/algorithms/doms/geo.py @@ -0,0 +1,113 @@ +import math + + +MEAN_RADIUS_EARTH_METERS = 6371010.0 +EQUATORIAL_RADIUS_EARTH_METERS = 6378140.0 +POLAR_RADIUS_EARTH_METERS = 6356752.0 +FLATTENING_EARTH = 298.257223563 +MEAN_RADIUS_EARTH_MILES = 3958.8 + + +class DistanceUnit(object): + METERS = 0 + MILES = 1 + + +# Haversine implementation for great-circle distances between two points +def haversine(x0, y0, x1, y1, units=DistanceUnit.METERS): + if units == DistanceUnit.METERS: + R = MEAN_RADIUS_EARTH_METERS + elif units == DistanceUnit.MILES: + R = MEAN_RADIUS_EARTH_MILES + else: + raise Exception("Invalid units specified") + x0r = x0 * (math.pi / 180.0) # To radians + x1r = x1 * (math.pi / 180.0) # To radians + xd = (x1 - x0) * (math.pi / 180.0) + yd = (y1 - y0) * (math.pi / 180.0) + + a = math.sin(xd/2.0) * math.sin(xd/2.0) + \ + math.cos(x0r) * math.cos(x1r) * \ + math.sin(yd / 2.0) * math.sin(yd / 2.0) + c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a)) + d = R * c + return d + + +# Equirectangular approximation for when performance is key. Better at smaller distances +def equirectangularApprox(x0, y0, x1, y1): + R = 6371000.0 # Meters + x0r = x0 * (math.pi / 180.0) # To radians + x1r = x1 * (math.pi / 180.0) + y0r = y0 * (math.pi / 180.0) + y1r = y1 * (math.pi / 180.0) + + x = (y1r - y0r) * math.cos((x0r + x1r) / 2.0) + y = x1r - x0r + d = math.sqrt(x*x + y*y) * R + return d + + + +class BoundingBox(object): + + def __init__(self, north=None, south=None, west=None, east=None, asString=None): + if asString is not None: + bboxParts = asString.split(",") + self.west=float(bboxParts[0]) + self.south=float(bboxParts[1]) + self.east=float(bboxParts[2]) + self.north=float(bboxParts[3]) + else: + self.north = north + self.south = south + self.west = west + self.east = east + + def toString(self): + return "%s,%s,%s,%s"%(self.west, self.south, self.east, self.north) + + def toMap(self): + return { + "xmin": self.west, + "xmax": self.east, + "ymin": self.south, + "ymax": self.north + } + +''' + Constrains, does not expand. +''' +class BoundsConstrainer(BoundingBox): + + def __init__(self, north=None, south=None, west=None, east=None, asString=None): + BoundingBox.__init__(self, north, south, west, east, asString) + + def testNorth(self, v): + if v is None: + return + self.north = max([self.north, v]) + + def testSouth(self, v): + if v is None: + return + self.south = min([self.south, v]) + + def testEast(self, v): + if v is None: + return + self.east = max([self.east, v]) + + def testWest(self, v): + if v is None: + return + self.west = min([self.west, v]) + + def testCoords(self, north=None, south=None, west=None, east=None): + self.testNorth(north) + self.testSouth(south) + self.testWest(west) + self.testEast(east) + + def testOtherConstrainer(self, other): + self.testCoords(north=other.north, south=other.south, west=other.west, east=other.east) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/histogramplot.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/histogramplot.py b/analysis/webservice/algorithms/doms/histogramplot.py new file mode 100644 index 0000000..a413769 --- /dev/null +++ b/analysis/webservice/algorithms/doms/histogramplot.py @@ -0,0 +1,117 @@ + +import BaseDomsHandler +import ResultsStorage +import string +from cStringIO import StringIO +import matplotlib.mlab as mlab + +from multiprocessing import Process, Manager + +import numpy as np +import matplotlib.pyplot as plt +import matplotlib +matplotlib.use('Agg') + + + + +PARAMETER_TO_FIELD = { + "sst": "sea_water_temperature", + "sss": "sea_water_salinity" +} + +PARAMETER_TO_UNITS = { + "sst": "($^\circ$C)", + "sss": "(g/L)" +} + +class DomsHistogramPlotQueryResults(BaseDomsHandler.DomsQueryResults): + + def __init__(self, x, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None): + BaseDomsHandler.DomsQueryResults.__init__(self, results=x, args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId) + self.__primary = primary + self.__secondary = secondary + self.__x = x + self.__parameter = parameter + self.__plot = plot + + def toImage(self): + return self.__plot + + +def render(d, x, primary, secondary, parameter, norm_and_curve=False): + + fig, ax = plt.subplots() + fig.suptitle(string.upper("%s vs. %s" % (primary, secondary)), fontsize=14, fontweight='bold') + + + + n, bins, patches = plt.hist(x, 50, normed=norm_and_curve, facecolor='green', alpha=0.75) + + + if norm_and_curve: + mean = np.mean(x) + variance = np.var(x) + sigma = np.sqrt(variance) + y = mlab.normpdf(bins, mean, sigma) + l = plt.plot(bins, y, 'r--', linewidth=1) + + ax.set_title('n = %d' % len(x)) + + units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"] + ax.set_xlabel("%s - %s %s" % (primary, secondary, units)) + + if norm_and_curve: + ax.set_ylabel("Probability per unit difference") + else: + ax.set_ylabel("Frequency") + + plt.grid(True) + + sio = StringIO() + plt.savefig(sio, format='png') + d['plot'] = sio.getvalue() + + +def renderAsync(x, primary, secondary, parameter, norm_and_curve): + manager = Manager() + d = manager.dict() + p = Process(target=render, args=(d, x, primary, secondary, parameter, norm_and_curve)) + p.start() + p.join() + return d['plot'] + + +def createHistogramPlot(id, parameter, norm_and_curve=False): + + with ResultsStorage.ResultsRetrieval() as storage: + params, stats, data = storage.retrieveResults(id) + + primary = params["primary"] + secondary = params["matchup"][0] + + x = createHistTable(data, secondary, parameter) + + plot = renderAsync(x, primary, secondary, parameter, norm_and_curve) + + r = DomsHistogramPlotQueryResults(x=x, parameter=parameter, primary=primary, secondary=secondary, + args=params, details=stats, + bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) + return r + + +def createHistTable(results, secondary, parameter): + + x = [] + + field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] + + for entry in results: + for match in entry["matches"]: + if match["source"] == secondary: + if field in entry and field in match: + a = entry[field] + b = match[field] + x.append((a - b)) + + return x \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/insitusubset.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/insitusubset.py b/analysis/webservice/algorithms/doms/insitusubset.py new file mode 100644 index 0000000..c688530 --- /dev/null +++ b/analysis/webservice/algorithms/doms/insitusubset.py @@ -0,0 +1,248 @@ +import StringIO +import csv +import json +import logging +from datetime import datetime + +import requests + +import BaseDomsHandler +from webservice.NexusHandler import nexus_handler +from webservice.algorithms.doms import config as edge_endpoints +from webservice.webmodel import NexusProcessingException, NoDataException + +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +@nexus_handler +class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryHandler): + name = "DOMS In Situ Subsetter" + path = "/domsinsitusubset" + description = "Subset a DOMS in situ source given the search domain." + + params = [ + { + "name": "source", + "type": "comma-delimited string", + "description": "The in situ Dataset to be sub-setted", + "required": "true", + "sample": "spurs" + }, + { + "name": "parameter", + "type": "string", + "description": "The parameter of interest. One of 'sst', 'sss', 'wind'", + "required": "false", + "default": "All", + "sample": "sss" + }, + { + "name": "startTime", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH", + "required": "true", + "sample": "2013-10-21T00:00:00Z" + }, + { + "name": "endTime", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH", + "required": "true", + "sample": "2013-10-31T23:59:59Z" + }, + { + "name": "b", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + "required": "true", + "sample": "-30,15,-45,30" + }, + { + "name": "depthMin", + "type": "float", + "description": "Minimum depth of measurements. Must be less than depthMax", + "required": "false", + "default": "No limit", + "sample": "0" + }, + { + "name": "depthMax", + "type": "float", + "description": "Maximum depth of measurements. Must be greater than depthMin", + "required": "false", + "default": "No limit", + "sample": "5" + }, + { + "name": "platforms", + "type": "comma-delimited integer", + "description": "Platforms to include for subset consideration", + "required": "false", + "default": "All", + "sample": "1,2,3,4,5,6,7,8,9" + }, + { + "name": "output", + "type": "string", + "description": "Output type. Only 'CSV' or 'JSON' is currently supported", + "required": "false", + "default": "JSON", + "sample": "CSV" + } + ] + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + + source_name = request.get_argument('source', None) + if source_name is None or source_name.strip() == '': + raise NexusProcessingException(reason="'source' argument is required", code=400) + + parameter_s = request.get_argument('parameter', None) + if parameter_s not in ['sst', 'sss', 'wind', None]: + raise NexusProcessingException( + reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) + + try: + start_time = request.get_start_datetime() + start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + try: + bounding_polygon = request.get_bounding_polygon() + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + + depth_min = request.get_decimal_arg('depthMin', default=None) + depth_max = request.get_decimal_arg('depthMax', default=None) + + if depth_min is not None and depth_max is not None and depth_min >= depth_max: + raise NexusProcessingException( + reason="Depth Min should be less than Depth Max", code=400) + + platforms = request.get_argument('platforms', None) + if platforms is not None: + try: + p_validation = platforms.split(',') + p_validation = [int(p) for p in p_validation] + del p_validation + except: + raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) + + return source_name, parameter_s, start_time, end_time, bounding_polygon, depth_min, depth_max, platforms + + def calc(self, request, **args): + + source_name, parameter_s, start_time, end_time, bounding_polygon, \ + depth_min, depth_max, platforms = self.parse_arguments(request) + + with requests.session() as edge_session: + edge_results = query_edge(source_name, parameter_s, start_time, end_time, + ','.join([str(bound) for bound in bounding_polygon.bounds]), + platforms, depth_min, depth_max, edge_session)['results'] + + if len(edge_results) == 0: + raise NoDataException + return InSituSubsetResult(results=edge_results) + + +class InSituSubsetResult(object): + def __init__(self, results): + self.results = results + + def toJson(self): + return json.dumps(self.results, indent=4) + + def toCSV(self): + fieldnames = sorted(next(iter(self.results)).keys()) + + csv_mem_file = StringIO.StringIO() + try: + writer = csv.DictWriter(csv_mem_file, fieldnames=fieldnames) + + writer.writeheader() + writer.writerows(self.results) + csv_out = csv_mem_file.getvalue() + finally: + csv_mem_file.close() + + return csv_out + + +def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, session, itemsPerPage=1000, + startIndex=0, stats=True): + log = logging.getLogger('webservice.algorithms.doms.insitusubset.query_edge') + try: + startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + platform = platform.split(',') + except AttributeError: + # Assume we were passed a list + pass + + params = {"startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "minDepth": depth_min, + "maxDepth": depth_max, + "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()} + + if variable: + params['variable'] = variable + if platform: + params['platform'] = platform + + edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) + + edge_request.raise_for_status() + edge_response = json.loads(edge_request.text) + + # Get all edge results + next_page_url = edge_response.get('next', None) + while next_page_url is not None: + log.debug("requesting %s" % next_page_url) + edge_page_request = session.get(next_page_url) + + edge_page_request.raise_for_status() + edge_page_response = json.loads(edge_page_request.text) + + edge_response['results'].extend(edge_page_response['results']) + + next_page_url = edge_page_response.get('next', None) + + return edge_response http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/mapplot.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/mapplot.py b/analysis/webservice/algorithms/doms/mapplot.py new file mode 100644 index 0000000..47870a7 --- /dev/null +++ b/analysis/webservice/algorithms/doms/mapplot.py @@ -0,0 +1,171 @@ + +from mpl_toolkits.basemap import Basemap + +import BaseDomsHandler +import ResultsStorage +import numpy as np +import string +from cStringIO import StringIO + +from multiprocessing import Process, Manager + +import matplotlib.pyplot as plt +import matplotlib +matplotlib.use('Agg') + + +PARAMETER_TO_FIELD = { + "sst": "sea_water_temperature", + "sss": "sea_water_salinity" +} + +PARAMETER_TO_UNITS = { + "sst": "($^\circ$ C)", + "sss": "(g/L)" +} + + +def __square(minLon, maxLon, minLat, maxLat): + if maxLat - minLat > maxLon - minLon: + a = ((maxLat - minLat) - (maxLon - minLon)) / 2.0 + minLon -= a + maxLon += a + elif maxLon - minLon > maxLat - minLat: + a = ((maxLon - minLon) - (maxLat - minLat)) / 2.0 + minLat -= a + maxLat += a + + return minLon, maxLon, minLat, maxLat + + +def render(d, lats, lons, z, primary, secondary, parameter): + fig = plt.figure() + ax = fig.add_axes([0.1, 0.1, 0.8, 0.8]) + + + ax.set_title(string.upper("%s vs. %s" % (primary, secondary))) + # ax.set_ylabel('Latitude') + # ax.set_xlabel('Longitude') + + minLatA = np.min(lats) + maxLatA = np.max(lats) + minLonA = np.min(lons) + maxLonA = np.max(lons) + + minLat = minLatA - (abs(maxLatA - minLatA) * 0.1) + maxLat = maxLatA + (abs(maxLatA - minLatA) * 0.1) + + minLon = minLonA - (abs(maxLonA - minLonA) * 0.1) + maxLon = maxLonA + (abs(maxLonA - minLonA) * 0.1) + + minLon, maxLon, minLat, maxLat = __square(minLon, maxLon, minLat, maxLat) + + + # m = Basemap(projection='mill', llcrnrlon=-180,llcrnrlat=-80,urcrnrlon=180,urcrnrlat=80,resolution='l') + m = Basemap(projection='mill', llcrnrlon=minLon, llcrnrlat=minLat, urcrnrlon=maxLon, urcrnrlat=maxLat, + resolution='l') + + + m.drawparallels(np.arange(minLat, maxLat, (maxLat - minLat) / 5.0), labels=[1, 0, 0, 0], fontsize=10) + m.drawmeridians(np.arange(minLon, maxLon, (maxLon - minLon) / 5.0), labels=[0, 0, 0, 1], fontsize=10) + + m.drawcoastlines() + m.drawmapboundary(fill_color='#99ffff') + m.fillcontinents(color='#cc9966', lake_color='#99ffff') + + #lats, lons = np.meshgrid(lats, lons) + + masked_array = np.ma.array(z, mask=np.isnan(z)) + z = masked_array + + + values = np.zeros(len(z)) + for i in range(0, len(z)): + values[i] = ((z[i] - np.min(z)) / (np.max(z) - np.min(z)) * 20.0) + 10 + + x, y = m(lons, lats) + + im1 = m.scatter(x, y, values) + + + im1.set_array(z) + cb = m.colorbar(im1) + + units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"] + cb.set_label("Difference %s" % units) + + sio = StringIO() + plt.savefig(sio, format='png') + plot = sio.getvalue() + if d is not None: + d['plot'] = plot + return plot + + + +class DomsMapPlotQueryResults(BaseDomsHandler.DomsQueryResults): + def __init__(self, lats, lons, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None): + BaseDomsHandler.DomsQueryResults.__init__(self, results={"lats": lats, "lons": lons, "values": z}, args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId) + self.__lats = lats + self.__lons = lons + self.__z = np.array(z) + self.__parameter = parameter + self.__primary = primary + self.__secondary = secondary + self.__plot = plot + + + + def toImage(self): + return self.__plot + + + + + +def renderAsync(x, y, z, primary, secondary, parameter): + manager = Manager() + d = manager.dict() + p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter)) + p.start() + p.join() + return d['plot'] + + + +def createMapPlot(id, parameter): + + with ResultsStorage.ResultsRetrieval() as storage: + params, stats, data = storage.retrieveResults(id) + + primary = params["primary"] + secondary = params["matchup"][0] + + lats = [] + lons = [] + z = [] + + field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] + + for entry in data: + for match in entry["matches"]: + if match["source"] == secondary: + + if field in entry and field in match: + a = entry[field] + b = match[field] + z.append((a - b)) + z.append((a - b)) + else: + z.append(1.0) + z.append(1.0) + lats.append(entry["y"]) + lons.append(entry["x"]) + lats.append(match["y"]) + lons.append(match["x"]) + + plot = renderAsync(lats, lons, z, primary, secondary, parameter) + r = DomsMapPlotQueryResults(lats=lats, lons=lons, z=z, parameter=parameter, primary=primary, secondary=secondary, + args=params, + details=stats, bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) + return r http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/scatterplot.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/scatterplot.py b/analysis/webservice/algorithms/doms/scatterplot.py new file mode 100644 index 0000000..0cbe110 --- /dev/null +++ b/analysis/webservice/algorithms/doms/scatterplot.py @@ -0,0 +1,112 @@ + +import BaseDomsHandler +import ResultsStorage +import string +from cStringIO import StringIO + +from multiprocessing import Process, Manager + +import matplotlib.pyplot as plt +import matplotlib +matplotlib.use('Agg') + + + + +PARAMETER_TO_FIELD = { + "sst": "sea_water_temperature", + "sss": "sea_water_salinity" +} + +PARAMETER_TO_UNITS = { + "sst": "($^\circ$ C)", + "sss": "(g/L)" +} + + + +def render(d, x, y, z, primary, secondary, parameter): + fig, ax = plt.subplots() + + ax.set_title(string.upper("%s vs. %s" % (primary, secondary))) + + units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS[ + "sst"] + ax.set_ylabel("%s %s" % (secondary, units)) + ax.set_xlabel("%s %s" % (primary, units)) + + ax.scatter(x, y) + + sio = StringIO() + plt.savefig(sio, format='png') + d['plot'] = sio.getvalue() + +class DomsScatterPlotQueryResults(BaseDomsHandler.DomsQueryResults): + + def __init__(self, x, y, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, computeOptions=None, executionId=None, plot=None): + BaseDomsHandler.DomsQueryResults.__init__(self, results=[x, y], args=args, details=details, bounds=bounds, count=count, computeOptions=computeOptions, executionId=executionId) + self.__primary = primary + self.__secondary = secondary + self.__x = x + self.__y = y + self.__z = z + self.__parameter = parameter + self.__plot = plot + + def toImage(self): + return self.__plot + + + + +def renderAsync(x, y, z, primary, secondary, parameter): + manager = Manager() + d = manager.dict() + p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter)) + p.start() + p.join() + return d['plot'] + + + +def createScatterPlot(id, parameter): + + with ResultsStorage.ResultsRetrieval() as storage: + params, stats, data = storage.retrieveResults(id) + + primary = params["primary"] + secondary = params["matchup"][0] + + x, y, z = createScatterTable(data, secondary, parameter) + + plot = renderAsync(x, y, z, primary, secondary, parameter) + + r = DomsScatterPlotQueryResults(x=x, y=y, z=z, parameter=parameter, primary=primary, secondary=secondary, + args=params, details=stats, + bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) + return r + + +def createScatterTable(results, secondary, parameter): + x = [] + y = [] + z = [] + + field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] + + for entry in results: + for match in entry["matches"]: + if match["source"] == secondary: + if field in entry and field in match: + a = entry[field] + b = match[field] + x.append(a) + y.append(b) + z.append(a - b) + + return x, y, z + + + + + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/subsetter.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py new file mode 100644 index 0000000..6727a48 --- /dev/null +++ b/analysis/webservice/algorithms/doms/subsetter.py @@ -0,0 +1,245 @@ +import logging +import os +import tempfile +import zipfile +from datetime import datetime + +import requests + +import BaseDomsHandler +from webservice.NexusHandler import nexus_handler +from webservice.webmodel import NexusProcessingException + +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +def is_blank(my_string): + return not (my_string and my_string.strip() != '') + + +@nexus_handler +class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryHandler): + name = "DOMS Subsetter" + path = "/domssubset" + description = "Subset DOMS sources given the search domain" + + params = { + "dataset": { + "name": "NEXUS Dataset", + "type": "string", + "description": "The NEXUS dataset. Optional but at least one of 'dataset' or 'insitu' are required" + }, + "insitu": { + "name": "In Situ sources", + "type": "comma-delimited string", + "description": "The in situ source(s). Optional but at least one of 'dataset' or 'insitu' are required" + }, + "parameter": { + "name": "Data Parameter", + "type": "string", + "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "b": { + "name": "Bounding box", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" + }, + "depthMin": { + "name": "Minimum Depth", + "type": "float", + "description": "Minimum depth of measurements. Must be less than depthMax. Optional" + }, + "depthMax": { + "name": "Maximum Depth", + "type": "float", + "description": "Maximum depth of measurements. Must be greater than depthMin. Optional" + }, + "platforms": { + "name": "Platforms", + "type": "comma-delimited integer", + "description": "Platforms to include for subset consideration. Optional" + }, + "output": { + "name": "Output", + "type": "string", + "description": "Output type. Only 'ZIP' is currently supported. Required" + } + } + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + + primary_ds_name = request.get_argument('dataset', None) + matchup_ds_names = request.get_argument('insitu', None) + + if is_blank(primary_ds_name) and is_blank(matchup_ds_names): + raise NexusProcessingException(reason="Either 'dataset', 'insitu', or both arguments are required", + code=400) + + if matchup_ds_names is not None: + try: + matchup_ds_names = matchup_ds_names.split(',') + except: + raise NexusProcessingException(reason="'insitu' argument should be a comma-seperated list", code=400) + + parameter_s = request.get_argument('parameter', None) + if parameter_s not in ['sst', 'sss', 'wind']: + raise NexusProcessingException( + reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) + + try: + start_time = request.get_start_datetime() + start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + try: + bounding_polygon = request.get_bounding_polygon() + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + + depth_min = request.get_decimal_arg('depthMin', default=None) + depth_max = request.get_decimal_arg('depthMax', default=None) + + if depth_min is not None and depth_max is not None and depth_min >= depth_max: + raise NexusProcessingException( + reason="Depth Min should be less than Depth Max", code=400) + + platforms = request.get_argument('platforms', None) + if platforms is not None: + try: + p_validation = platforms.split(',') + p_validation = [int(p) for p in p_validation] + del p_validation + except: + raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) + + return primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \ + bounding_polygon, depth_min, depth_max, platforms + + def calc(self, request, **args): + + primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \ + bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request) + + primary_url = "https://doms.jpl.nasa.gov/datainbounds" + primary_params = { + 'ds': primary_ds_name, + 'parameter': parameter_s, + 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]), + 'startTime': start_time, + 'endTime': end_time, + 'output': "CSV" + } + + matchup_url = "https://doms.jpl.nasa.gov/domsinsitusubset" + matchup_params = { + 'source': None, + 'parameter': parameter_s, + 'startTime': start_time, + 'endTime': end_time, + 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]), + 'depthMin': depth_min, + 'depthMax': depth_max, + 'platforms': platforms, + 'output': 'CSV' + } + + primary_temp_file_path = None + matchup_downloads = None + + with requests.session() as session: + + if not is_blank(primary_ds_name): + # Download primary + primary_temp_file, primary_temp_file_path = tempfile.mkstemp(suffix='.csv') + download_file(primary_url, primary_temp_file_path, session, params=primary_params) + + if len(matchup_ds_names) > 0: + # Download matchup + matchup_downloads = {} + for matchup_ds in matchup_ds_names: + matchup_downloads[matchup_ds] = tempfile.mkstemp(suffix='.csv') + matchup_params['source'] = matchup_ds + download_file(matchup_url, matchup_downloads[matchup_ds][1], session, params=matchup_params) + + # Zip downloads + date_range = "%s-%s" % (datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"), + datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d")) + bounds = '%.4fW_%.4fS_%.4fE_%.4fN' % bounding_polygon.bounds + zip_dir = tempfile.mkdtemp() + zip_path = '%s/subset.%s.%s.zip' % (zip_dir, date_range, bounds) + with zipfile.ZipFile(zip_path, 'w') as my_zip: + if primary_temp_file_path: + my_zip.write(primary_temp_file_path, arcname='%s.%s.%s.csv' % (primary_ds_name, date_range, bounds)) + if matchup_downloads: + for matchup_ds, download in matchup_downloads.iteritems(): + my_zip.write(download[1], arcname='%s.%s.%s.csv' % (matchup_ds, date_range, bounds)) + + # Clean up + if primary_temp_file_path: + os.remove(primary_temp_file_path) + if matchup_downloads: + for matchup_ds, download in matchup_downloads.iteritems(): + os.remove(download[1]) + + return SubsetResult(zip_path) + + +class SubsetResult(object): + def __init__(self, zip_path): + self.zip_path = zip_path + + def toJson(self): + raise NotImplementedError + + def toZip(self): + with open(self.zip_path, 'rb') as zip_file: + zip_contents = zip_file.read() + + return zip_contents + + def cleanup(self): + os.remove(self.zip_path) + + +def download_file(url, filepath, session, params=None): + r = session.get(url, params=params, stream=True) + with open(filepath, 'wb') as f: + for chunk in r.iter_content(chunk_size=1024): + if chunk: # filter out keep-alive new chunks + f.write(chunk) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/values.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/values.py b/analysis/webservice/algorithms/doms/values.py new file mode 100644 index 0000000..5e98ce9 --- /dev/null +++ b/analysis/webservice/algorithms/doms/values.py @@ -0,0 +1,57 @@ +PLATFORMS = [ + {"id": 1, "desc": "ship"}, + {"id": 2, "desc": "moored surface buoy"}, + {"id": 3, "desc": "drifting surface float"}, + {"id": 4, "desc": "drifting subsurface profiling float"}, + {"id": 5, "desc": "autonomous underwater vehicle"}, + {"id": 6, "desc": "offshore structure"}, + {"id": 7, "desc": "coastal structure"}, + {"id": 8, "desc": "towed unmanned submersible"}, + {"id": 9, "desc": "orbiting satellite"} +] + +DEVICES = [ + {"id": 1, "desc": "bathythermographs"}, + {"id": 2, "desc": "discrete water samplers"}, + {"id": 3, "desc": "CTD"}, + {"id": 4, "desc": "Current profilers / acousticDopplerCurrentProfiler"}, + {"id": 5, "desc": "radiometers"}, + {"id": 6, "desc": "scatterometers"} +] + +MISSIONS = [ + {"id": 1, "desc": "SAMOS"}, + {"id": 2, "desc": "ICOADS"}, + {"id": 3, "desc": "Aquarius"}, + {"id": 4, "desc": "SPURS1"} +] + + +def getDescById(list, id): + for item in list: + if item["id"] == id: + return item["desc"] + return id + + +def getPlatformById(id): + return getDescById(PLATFORMS, id) + + +def getDeviceById(id): + return getDescById(DEVICES, id) + + +def getMissionById(id): + return getDescById(MISSIONS, id) + + +def getDescByListNameAndId(listName, id): + if listName.upper() == "PLATFORM": + return getPlatformById(id) + elif listName.upper() == "DEVICE": + return getDeviceById(id) + elif listName.upper() == "MISSION": + return getMissionById(id) + else: + raise Exception("Invalid list name specified ('%s')" % listName) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/workerthread.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/workerthread.py b/analysis/webservice/algorithms/doms/workerthread.py new file mode 100644 index 0000000..097e604 --- /dev/null +++ b/analysis/webservice/algorithms/doms/workerthread.py @@ -0,0 +1,51 @@ +import os +import sys +import threading + +class WorkerThread(threading.Thread): + + def __init__(self, method, params): + threading.Thread.__init__(self) + self.method = method + self.params = params + self.completed = False + self.results = None + + + def run(self): + + + self.results = self.method(*self.params) + self.completed = True + + +def __areAllComplete(threads): + + for thread in threads: + if not thread.completed: + return False + + return True + +def wait(threads, startFirst=False, poll=0.5): + + if startFirst: + for thread in threads: + thread.start() + + while not __areAllComplete(threads): + threading._sleep(poll) + + + +def foo(param1, param2): + print param1, param2 + return "c" + +if __name__ == "__main__": + + thread = WorkerThread(foo, params=("a", "b")) + thread.start() + while not thread.completed: + threading._sleep(0.5) + print thread.results http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/ClimMapSpark.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py b/analysis/webservice/algorithms_spark/ClimMapSpark.py new file mode 100644 index 0000000..77e3c38 --- /dev/null +++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py @@ -0,0 +1,257 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import logging +from calendar import timegm, monthrange +from datetime import datetime + +import numpy as np +from nexustiles.nexustiles import NexusTileService + +from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException + + +@nexus_handler +class ClimMapSparkHandlerImpl(SparkHandler): + name = "Climatology Map Spark" + path = "/climMapSpark" + description = "Computes a Latitude/Longitude Time Average map for a given month given an arbitrary geographical area and year range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + SparkHandler.__init__(self) + self.log = logging.getLogger(__name__) + # self.log.setLevel(logging.DEBUG) + + @staticmethod + def _map(tile_in_spark): + tile_bounds = tile_in_spark[0] + (min_lat, max_lat, min_lon, max_lon, + min_y, max_y, min_x, max_x) = tile_bounds + startTime = tile_in_spark[1] + endTime = tile_in_spark[2] + ds = tile_in_spark[3] + tile_service = NexusTileService() + # print 'Started tile', tile_bounds + # sys.stdout.flush() + tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) + days_at_a_time = 90 + # days_at_a_time = 30 + # days_at_a_time = 7 + # days_at_a_time = 1 + # print 'days_at_a_time = ', days_at_a_time + t_incr = 86400 * days_at_a_time + sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64)) + cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32)) + t_start = startTime + while t_start <= endTime: + t_end = min(t_start + t_incr, endTime) + # t1 = time() + # print 'nexus call start at time %f' % t1 + # sys.stdout.flush() + nexus_tiles = \ + ClimMapSparkHandlerImpl.query_by_parts(tile_service, + min_lat, max_lat, + min_lon, max_lon, + ds, + t_start, + t_end, + part_dim=2) + # nexus_tiles = \ + # tile_service.get_tiles_bounded_by_box(min_lat, max_lat, + # min_lon, max_lon, + # ds=ds, + # start_time=t_start, + # end_time=t_end) + # t2 = time() + # print 'nexus call end at time %f' % t2 + # print 'secs in nexus call: ', t2-t1 + # sys.stdout.flush() + # print 't %d to %d - Got %d tiles' % (t_start, t_end, + # len(nexus_tiles)) + # for nt in nexus_tiles: + # print nt.granule + # print nt.section_spec + # print 'lat min/max:', np.ma.min(nt.latitudes), np.ma.max(nt.latitudes) + # print 'lon min/max:', np.ma.min(nt.longitudes), np.ma.max(nt.longitudes) + # sys.stdout.flush() + + for tile in nexus_tiles: + tile.data.data[:, :] = np.nan_to_num(tile.data.data) + sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1] + cnt_tile += (~tile.data.mask[0, + min_y:max_y + 1, + min_x:max_x + 1]).astype(np.uint8) + t_start = t_end + 1 + + # print 'cnt_tile = ', cnt_tile + # cnt_tile.mask = ~(cnt_tile.data.astype(bool)) + # sum_tile.mask = cnt_tile.mask + # avg_tile = sum_tile / cnt_tile + # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in range(tile_inbounds_shape[0])] + # print 'Finished tile', tile_bounds + # print 'Tile avg = ', avg_tile + # sys.stdout.flush() + return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)) + + def _month_from_timestamp(self, t): + return datetime.utcfromtimestamp(t).month + + def calc(self, computeOptions, **args): + """ + + :param computeOptions: StatsComputeOptions + :param args: dict + :return: + """ + + spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg() + self._setQueryParams(computeOptions.get_dataset()[0], + (float(computeOptions.get_min_lat()), + float(computeOptions.get_max_lat()), + float(computeOptions.get_min_lon()), + float(computeOptions.get_max_lon())), + start_year=computeOptions.get_start_year(), + end_year=computeOptions.get_end_year(), + clim_month=computeOptions.get_clim_month(), + spark_master=spark_master, + spark_nexecs=spark_nexecs, + spark_nparts=spark_nparts) + self._startTime = timegm((self._startYear, 1, 1, 0, 0, 0)) + self._endTime = timegm((self._endYear, 12, 31, 23, 59, 59)) + + if 'CLIM' in self._ds: + raise NexusProcessingException(reason="Cannot compute Latitude/Longitude Time Average map on a climatology", + code=400) + + nexus_tiles = self._find_global_tile_set() + # print 'tiles:' + # for tile in nexus_tiles: + # print tile.granule + # print tile.section_spec + # print 'lat:', tile.latitudes + # print 'lon:', tile.longitudes + + # nexus_tiles) + if len(nexus_tiles) == 0: + raise NoDataException(reason="No data found for selected timeframe") + + self.log.debug('Found {0} tiles'.format(len(nexus_tiles))) + # for tile in nexus_tiles: + # print 'lats: ', tile.latitudes.compressed() + # print 'lons: ', tile.longitudes.compressed() + self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes)) + nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1 + nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1 + self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons)) + self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent, + self._maxLatCent)) + self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent, + self._maxLonCent)) + + # Create array of tuples to pass to Spark map function + nexus_tiles_spark = [[self._find_tile_bounds(t), + self._startTime, self._endTime, + self._ds] for t in nexus_tiles] + # print 'nexus_tiles_spark = ', nexus_tiles_spark + # Remove empty tiles (should have bounds set to None) + bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0] + for i in np.flipud(bad_tile_inds): + del nexus_tiles_spark[i] + num_nexus_tiles_spark = len(nexus_tiles_spark) + self.log.debug('Created {0} spark tiles'.format(num_nexus_tiles_spark)) + + # Expand Spark map tuple array by duplicating each entry N times, + # where N is the number of ways we want the time dimension carved up. + # (one partition per year in this case). + num_years = self._endYear - self._startYear + 1 + nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_years, axis=0) + self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark))) + + # Set the time boundaries for each of the Spark map tuples. + # Every Nth element in the array gets the same time bounds. + spark_part_time_ranges = \ + np.repeat(np.array([[timegm((y, self._climMonth, 1, 0, 0, 0)), + timegm((y, self._climMonth, + monthrange(y, self._climMonth)[1], + 23, 59, 59))] + for y in range(self._startYear, + self._endYear + 1)]), + num_nexus_tiles_spark, + axis=0).reshape((len(nexus_tiles_spark), 2)) + self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges)) + nexus_tiles_spark[:, 1:3] = spark_part_time_ranges + # print 'nexus_tiles_spark final = ' + # for i in range(len(nexus_tiles_spark)): + # print nexus_tiles_spark[i] + + # Launch Spark computations + rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts) + sum_count_part = rdd.map(self._map) + sum_count = \ + sum_count_part.combineByKey(lambda val: val, + lambda x, val: (x[0] + val[0], + x[1] + val[1]), + lambda x, y: (x[0] + y[0], x[1] + y[1])) + avg_tiles = \ + sum_count.map(lambda (bounds, (sum_tile, cnt_tile)): + (bounds, [[{'avg': (sum_tile[y, x] / cnt_tile[y, x]) + if (cnt_tile[y, x] > 0) else 0., + 'cnt': cnt_tile[y, x]} + for x in + range(sum_tile.shape[1])] + for y in + range(sum_tile.shape[0])])).collect() + + # Combine subset results to produce global map. + # + # The tiles below are NOT Nexus objects. They are tuples + # with the time avg map data and lat-lon bounding box. + a = np.zeros((nlats, nlons), dtype=np.float64, order='C') + n = np.zeros((nlats, nlons), dtype=np.uint32, order='C') + for tile in avg_tiles: + if tile is not None: + ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon), + tile_stats) = tile + tile_data = np.ma.array( + [[tile_stats[y][x]['avg'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))]) + tile_cnt = np.array( + [[tile_stats[y][x]['cnt'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))]) + tile_data.mask = ~(tile_cnt.astype(bool)) + y0 = self._lat2ind(tile_min_lat) + y1 = y0 + tile_data.shape[0] - 1 + x0 = self._lon2ind(tile_min_lon) + x1 = x0 + tile_data.shape[1] - 1 + if np.any(np.logical_not(tile_data.mask)): + self.log.debug( + 'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat, + tile_max_lat, + tile_min_lon, + tile_max_lon, y0, + y1, x0, x1)) + a[y0:y1 + 1, x0:x1 + 1] = tile_data + n[y0:y1 + 1, x0:x1 + 1] = tile_cnt + else: + self.log.debug( + 'All pixels masked in tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format( + tile_min_lat, tile_max_lat, + tile_min_lon, tile_max_lon, + y0, y1, x0, x1)) + + # Store global map in a NetCDF file. + self._create_nc_file(a, 'clmap.nc', 'val') + + # Create dict for JSON response + results = [[{'avg': a[y, x], 'cnt': int(n[y, x]), + 'lat': self._ind2lat(y), 'lon': self._ind2lon(x)} + for x in range(a.shape[1])] for y in range(a.shape[0])] + + return ClimMapSparkResults(results=results, meta={}, computeOptions=computeOptions) + + +class ClimMapSparkResults(NexusResults): + def __init__(self, results=None, meta=None, computeOptions=None): + NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/CorrMapSpark.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py b/analysis/webservice/algorithms_spark/CorrMapSpark.py new file mode 100644 index 0000000..32f63e6 --- /dev/null +++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py @@ -0,0 +1,316 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import json +import logging + +import numpy as np +from nexustiles.nexustiles import NexusTileService + +# from time import time +from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException + + +@nexus_handler +class CorrMapSparkHandlerImpl(SparkHandler): + name = "Correlation Map Spark" + path = "/corrMapSpark" + description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + SparkHandler.__init__(self) + self.log = logging.getLogger(__name__) + # self.log.setLevel(logging.DEBUG) + + @staticmethod + def _map(tile_in): + # Unpack input + tile_bounds, start_time, end_time, ds = tile_in + (min_lat, max_lat, min_lon, max_lon, + min_y, max_y, min_x, max_x) = tile_bounds + + # Create arrays to hold intermediate results during + # correlation coefficient calculation. + tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) + sumx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64) + sumy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64) + sumxx_tile = np.zeros(tile_inbounds_shape, dtype=np.float64) + sumyy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64) + sumxy_tile = np.zeros(tile_inbounds_shape, dtype=np.float64) + n_tile = np.zeros(tile_inbounds_shape, dtype=np.uint32) + + # Can only retrieve some number of days worth of data from Solr + # at a time. Set desired value here. + days_at_a_time = 90 + # days_at_a_time = 30 + # days_at_a_time = 7 + # days_at_a_time = 1 + # print 'days_at_a_time = ', days_at_a_time + t_incr = 86400 * days_at_a_time + + tile_service = NexusTileService() + + # Compute the intermediate summations needed for the Pearson + # Correlation Coefficient. We use a one-pass online algorithm + # so that not all of the data needs to be kept in memory all at once. + t_start = start_time + while t_start <= end_time: + t_end = min(t_start + t_incr, end_time) + # t1 = time() + # print 'nexus call start at time %f' % t1 + # sys.stdout.flush() + ds1tiles = tile_service.get_tiles_bounded_by_box(min_lat, + max_lat, + min_lon, + max_lon, + ds[0], + t_start, + t_end) + ds2tiles = tile_service.get_tiles_bounded_by_box(min_lat, + max_lat, + min_lon, + max_lon, + ds[1], + t_start, + t_end) + # t2 = time() + # print 'nexus call end at time %f' % t2 + # print 'secs in nexus call: ', t2-t1 + # sys.stdout.flush() + + len1 = len(ds1tiles) + len2 = len(ds2tiles) + # print 't %d to %d - Got %d and %d tiles' % (t_start, t_end, + # len1, len2) + # sys.stdout.flush() + i1 = 0 + i2 = 0 + time1 = 0 + time2 = 0 + while i1 < len1 and i2 < len2: + tile1 = ds1tiles[i1] + tile2 = ds2tiles[i2] + # print 'tile1.data = ',tile1.data + # print 'tile2.data = ',tile2.data + # print 'i1, i2, t1, t2 times: ', i1, i2, tile1.times[0], tile2.times[0] + assert tile1.times[0] >= time1, 'DS1 time out of order!' + assert tile2.times[0] >= time2, 'DS2 time out of order!' + time1 = tile1.times[0] + time2 = tile2.times[0] + # print 'i1=%d,i2=%d,time1=%d,time2=%d'%(i1,i2,time1,time2) + if time1 < time2: + i1 += 1 + continue + elif time2 < time1: + i2 += 1 + continue + assert (time1 == time2), \ + "Mismatched tile times %d and %d" % (time1, time2) + # print 'processing time:',time1,time2 + t1_data = tile1.data.data + t1_mask = tile1.data.mask + t2_data = tile2.data.data + t2_mask = tile2.data.mask + t1_data = np.nan_to_num(t1_data) + t2_data = np.nan_to_num(t2_data) + joint_mask = ((~t1_mask).astype(np.uint8) * + (~t2_mask).astype(np.uint8)) + # print 'joint_mask=',joint_mask + sumx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] * + joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]) + # print 'sumx_tile=',sumx_tile + sumy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] * + joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]) + # print 'sumy_tile=',sumy_tile + sumxx_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] * + t1_data[0, min_y:max_y + 1, min_x:max_x + 1] * + joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]) + # print 'sumxx_tile=',sumxx_tile + sumyy_tile += (t2_data[0, min_y:max_y + 1, min_x:max_x + 1] * + t2_data[0, min_y:max_y + 1, min_x:max_x + 1] * + joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]) + # print 'sumyy_tile=',sumyy_tile + sumxy_tile += (t1_data[0, min_y:max_y + 1, min_x:max_x + 1] * + t2_data[0, min_y:max_y + 1, min_x:max_x + 1] * + joint_mask[0, min_y:max_y + 1, min_x:max_x + 1]) + # print 'sumxy_tile=',sumxy_tile + n_tile += joint_mask[0, min_y:max_y + 1, min_x:max_x + 1] + # print 'n_tile=',n_tile + i1 += 1 + i2 += 1 + t_start = t_end + 1 + + # print 'Finished tile', tile_bounds + # sys.stdout.flush() + return ((min_lat, max_lat, min_lon, max_lon), (sumx_tile, sumy_tile, + sumxx_tile, sumyy_tile, + sumxy_tile, n_tile)) + + def calc(self, computeOptions, **args): + + spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg() + self._setQueryParams(computeOptions.get_dataset(), + (float(computeOptions.get_min_lat()), + float(computeOptions.get_max_lat()), + float(computeOptions.get_min_lon()), + float(computeOptions.get_max_lon())), + computeOptions.get_start_time(), + computeOptions.get_end_time(), + spark_master=spark_master, + spark_nexecs=spark_nexecs, + spark_nparts=spark_nparts) + + self.log.debug('ds = {0}'.format(self._ds)) + if not len(self._ds) == 2: + raise NexusProcessingException( + reason="Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2", + code=400) + if next(iter([clim for clim in self._ds if 'CLIM' in clim]), False): + raise NexusProcessingException(reason="Cannot compute correlation on a climatology", code=400) + + nexus_tiles = self._find_global_tile_set() + # print 'tiles:' + # for tile in nexus_tiles: + # print tile.granule + # print tile.section_spec + # print 'lat:', tile.latitudes + # print 'lon:', tile.longitudes + + # nexus_tiles) + if len(nexus_tiles) == 0: + raise NoDataException(reason="No data found for selected timeframe") + + self.log.debug('Found {0} tiles'.format(len(nexus_tiles))) + self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes)) + nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1 + nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1 + self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons)) + + # Create array of tuples to pass to Spark map function + nexus_tiles_spark = [[self._find_tile_bounds(t), + self._startTime, self._endTime, + self._ds] for t in nexus_tiles] + + # Remove empty tiles (should have bounds set to None) + bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0] + for i in np.flipud(bad_tile_inds): + del nexus_tiles_spark[i] + + # Expand Spark map tuple array by duplicating each entry N times, + # where N is the number of ways we want the time dimension carved up. + num_time_parts = 72 + # num_time_parts = 2 + # num_time_parts = 1 + nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0) + self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark))) + + # Set the time boundaries for each of the Spark map tuples. + # Every Nth element in the array gets the same time bounds. + spark_part_times = np.linspace(self._startTime, self._endTime + 1, + num_time_parts + 1, dtype=np.int64) + + spark_part_time_ranges = \ + np.repeat([[[spark_part_times[i], + spark_part_times[i + 1] - 1] for i in range(num_time_parts)]], + len(nexus_tiles_spark) / num_time_parts, axis=0).reshape((len(nexus_tiles_spark), 2)) + self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges)) + nexus_tiles_spark[:, 1:3] = spark_part_time_ranges + # print 'nexus_tiles_spark final = ' + # for i in range(len(nexus_tiles_spark)): + # print nexus_tiles_spark[i] + + # Launch Spark computations + # print 'nexus_tiles_spark=',nexus_tiles_spark + rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts) + sum_tiles_part = rdd.map(self._map) + # print "sum_tiles_part = ",sum_tiles_part.collect() + sum_tiles = \ + sum_tiles_part.combineByKey(lambda val: val, + lambda x, val: (x[0] + val[0], + x[1] + val[1], + x[2] + val[2], + x[3] + val[3], + x[4] + val[4], + x[5] + val[5]), + lambda x, y: (x[0] + y[0], + x[1] + y[1], + x[2] + y[2], + x[3] + y[3], + x[4] + y[4], + x[5] + y[5])) + # Convert the N (pixel-wise count) array for each tile to be a + # NumPy masked array. That is the last array in the tuple of + # intermediate summation arrays. Set mask to True if count is 0. + sum_tiles = \ + sum_tiles.map(lambda (bounds, (sum_x, sum_y, sum_xx, + sum_yy, sum_xy, n)): + (bounds, (sum_x, sum_y, sum_xx, sum_yy, sum_xy, + np.ma.array(n, + mask=~(n.astype(bool)))))) + + # print 'sum_tiles = ',sum_tiles.collect() + + # For each pixel in each tile compute an array of Pearson + # correlation coefficients. The map function is called once + # per tile. The result of this map operation is a list of 3-tuples of + # (bounds, r, n) for each tile (r=Pearson correlation coefficient + # and n=number of input values that went into each pixel with + # any masked values not included). + corr_tiles = \ + sum_tiles.map(lambda (bounds, (sum_x, sum_y, sum_xx, sum_yy, + sum_xy, n)): + (bounds, + np.ma.array(((sum_xy - sum_x * sum_y / n) / + np.sqrt((sum_xx - sum_x * sum_x / n) * + (sum_yy - sum_y * sum_y / n))), + mask=~(n.astype(bool))), + n)).collect() + + r = np.zeros((nlats, nlons), dtype=np.float64, order='C') + n = np.zeros((nlats, nlons), dtype=np.uint32, order='C') + + # The tiles below are NOT Nexus objects. They are tuples + # with the following for each correlation map subset: + # (1) lat-lon bounding box, (2) array of correlation r values, + # and (3) array of count n values. + for tile in corr_tiles: + ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon), + tile_data, tile_cnt) = tile + y0 = self._lat2ind(tile_min_lat) + y1 = self._lat2ind(tile_max_lat) + x0 = self._lon2ind(tile_min_lon) + x1 = self._lon2ind(tile_max_lon) + self.log.debug( + 'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat, tile_max_lat, + tile_min_lon, tile_max_lon, + y0, y1, x0, x1)) + r[y0:y1 + 1, x0:x1 + 1] = tile_data + n[y0:y1 + 1, x0:x1 + 1] = tile_cnt + + # Store global map in a NetCDF file. + self._create_nc_file(r, 'corrmap.nc', 'r') + + # Create dict for JSON response + results = [[{'r': r[y, x], 'cnt': int(n[y, x]), + 'lat': self._ind2lat(y), 'lon': self._ind2lon(x)} + for x in range(r.shape[1])] for y in range(r.shape[0])] + + return CorrelationResults(results) + + +class CorrelationResults(NexusResults): + def __init__(self, results): + NexusResults.__init__(self) + self.results = results + + def toJson(self): + json_d = { + "stats": {}, + "meta": [None, None], + "data": self.results + } + return json.dumps(json_d, indent=4)
