http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/StandardDeviationSearch.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py new file mode 100644 index 0000000..002a45d --- /dev/null +++ b/analysis/webservice/algorithms/StandardDeviationSearch.py @@ -0,0 +1,191 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import json +import logging +from datetime import datetime +from functools import partial + +from nexustiles.nexustiles import NexusTileServiceException +from pytz import timezone + +from webservice.NexusHandler import NexusHandler, nexus_handler +from webservice.webmodel import NexusProcessingException, CustomEncoder + +SENTINEL = 'STOP' +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) + + +@nexus_handler +class StandardDeviationSearchHandlerImpl(NexusHandler): + name = "Standard Deviation Search" + path = "/standardDeviation" + description = "Retrieves the pixel standard deviation if it exists for a given longitude and latitude" + params = { + "ds": { + "name": "Dataset", + "type": "string", + "description": "One or more comma-separated dataset shortnames. Required." + }, + "longitude": { + "name": "Longitude", + "type": "float", + "description": "Longitude in degrees from -180 to 180. Required." + }, + "latitude": { + "name": "Latitude", + "type": "float", + "description": "Latitude in degrees from -90 to 90. Required." + }, + "day": { + "name": "Day of Year", + "type": "int", + "description": "Day of year to search from 0 to 365. One of day or date are required but not both." + }, + "date": { + "name": "Date", + "type": "string", + "description": "Datetime in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970). One of day " + "or date are required but not both." + }, + "allInTile": { + "name": "Get all Standard Deviations in Tile", + "type": "boolean", + "description": "Optional True/False flag. If true, return the standard deviations for every pixel in the " + "tile that contains the searched lon/lat point. If false, return the " + "standard deviation only for the searched lon/lat point. Default: True" + } + + } + singleton = True + + def __init__(self): + NexusHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + try: + ds = request.get_dataset()[0] + except: + raise NexusProcessingException(reason="'ds' argument is required", code=400) + + try: + longitude = float(request.get_decimal_arg("longitude", default=None)) + except: + raise NexusProcessingException(reason="'longitude' argument is required", code=400) + + try: + latitude = float(request.get_decimal_arg("latitude", default=None)) + except: + raise NexusProcessingException(reason="'latitude' argument is required", code=400) + + search_datetime = request.get_datetime_arg('date', default=None) + day_of_year = request.get_int_arg('day', default=None) + if (search_datetime is not None and day_of_year is not None) \ + or (search_datetime is None and day_of_year is None): + raise NexusProcessingException( + reason="At least one of 'day' or 'date' arguments are required but not both.", + code=400) + + if search_datetime is not None: + day_of_year = search_datetime.timetuple().tm_yday + + return_all = request.get_boolean_arg("allInTile", default=True) + + return ds, longitude, latitude, day_of_year, return_all + + def calc(self, request, **args): + raw_args_dict = {k: request.get_argument(k) for k in request.requestHandler.request.arguments} + ds, longitude, latitude, day_of_year, return_all = self.parse_arguments(request) + + if return_all: + func = partial(get_all_std_dev, tile_service=self._tile_service, ds=ds, longitude=longitude, + latitude=latitude, day_of_year=day_of_year) + else: + func = partial(get_single_std_dev, tile_service=self._tile_service, ds=ds, longitude=longitude, + latitude=latitude, day_of_year=day_of_year) + + try: + results = StandardDeviationSearchHandlerImpl.to_result_dict(func()) + except (NoTileException, NoStandardDeviationException): + return StandardDeviationSearchResult(raw_args_dict, []) + + return StandardDeviationSearchResult(raw_args_dict, results) + + @staticmethod + def to_result_dict(list_of_tuples): + # list_of_tuples = [(lon, lat, st_dev)] + return [ + { + "longitude": lon, + "latitude": lat, + "standard_deviation": st_dev + } for lon, lat, st_dev in list_of_tuples] + + +class NoTileException(Exception): + pass + + +class NoStandardDeviationException(Exception): + pass + + +def find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year): + from shapely.geometry import Point + point = Point(longitude, latitude) + + try: + tile = tile_service.find_tile_by_polygon_and_most_recent_day_of_year(point, ds, day_of_year)[0] + except NexusTileServiceException: + raise NoTileException + + # Check if this tile has any meta data that ends with 'std'. If it doesn't, just return nothing. + try: + st_dev_meta_name = next(iter([key for key in tile.meta_data.keys() if key.endswith('std')])) + except StopIteration: + raise NoStandardDeviationException + + return tile, st_dev_meta_name + + +def get_single_std_dev(tile_service, ds, longitude, latitude, day_of_year): + from scipy.spatial import distance + + tile, st_dev_meta_name = find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year) + + # Need to find the closest point in the tile to the input lon/lat point and return only that result + valid_indices = tile.get_indices() + tile_points = [tuple([tile.longitudes[lon_idx], tile.latitudes[lat_idx]]) for time_idx, lat_idx, lon_idx in + valid_indices] + closest_point_index = distance.cdist([(longitude, latitude)], tile_points).argmin() + closest_lon, closest_lat = tile_points[closest_point_index] + closest_point_tile_index = tuple(valid_indices[closest_point_index]) + std_at_point = tile.meta_data[st_dev_meta_name][closest_point_tile_index] + return [tuple([closest_lon, closest_lat, std_at_point])] + + +def get_all_std_dev(tile_service, ds, longitude, latitude, day_of_year): + tile, st_dev_meta_name = find_tile_and_std_name(tile_service, ds, longitude, latitude, day_of_year) + + valid_indices = tile.get_indices() + return [tuple([tile.longitudes[lon_idx], tile.latitudes[lat_idx], + tile.meta_data[st_dev_meta_name][time_idx, lat_idx, lon_idx]]) for time_idx, lat_idx, lon_idx in + valid_indices] + + +class StandardDeviationSearchResult(object): + def __init__(self, request_params, results): + self.request_params = request_params + self.results = results + + def toJson(self): + data = { + 'meta': self.request_params, + 'data': self.results, + 'stats': {} + } + return json.dumps(data, indent=4, cls=CustomEncoder)
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TestInitializer.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/TestInitializer.py b/analysis/webservice/algorithms/TestInitializer.py new file mode 100644 index 0000000..3f47c6a --- /dev/null +++ b/analysis/webservice/algorithms/TestInitializer.py @@ -0,0 +1,16 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" + +from webservice.NexusHandler import NexusHandler, nexus_initializer +from nexustiles.nexustiles import NexusTileService + +@nexus_initializer +class TestInitializer: + + def __init__(self): + pass + + def init(self, config): + print "*** TEST INITIALIZATION ***" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TileSearch.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/TileSearch.py b/analysis/webservice/algorithms/TileSearch.py new file mode 100644 index 0000000..d1fade8 --- /dev/null +++ b/analysis/webservice/algorithms/TileSearch.py @@ -0,0 +1,71 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +from webservice.NexusHandler import NexusHandler, nexus_handler +from webservice.webmodel import NexusResults + + +# @nexus_handler +class ChunkSearchHandlerImpl(NexusHandler): + name = "Data Tile Search" + path = "/tiles" + description = "Lists dataset tiles given a geographical area and time range" + singleton = True + params = { + "ds": { + "name": "Dataset", + "type": "string", + "description": "One dataset shortname" + }, + "minLat": { + "name": "Minimum Latitude", + "type": "float", + "description": "Minimum (Southern) bounding box Latitude" + }, + "maxLat": { + "name": "Maximum Latitude", + "type": "float", + "description": "Maximum (Northern) bounding box Latitude" + }, + "minLon": { + "name": "Minimum Longitude", + "type": "float", + "description": "Minimum (Western) bounding box Longitude" + }, + "maxLon": { + "name": "Maximum Longitude", + "type": "float", + "description": "Maximum (Eastern) bounding box Longitude" + }, + "startTime": { + "name": "Start Time", + "type": "long integer", + "description": "Starting time in seconds since midnight Jan. 1st, 1970 UTC" + }, + "endTime": { + "name": "End Time", + "type": "long integer", + "description": "Ending time in seconds since midnight Jan. 1st, 1970 UTC" + } + } + + def __init__(self): + NexusHandler.__init__(self, skipCassandra=True) + + def calc(self, computeOptions, **args): + minLat = computeOptions.get_min_lat() + maxLat = computeOptions.get_max_lat() + minLon = computeOptions.get_min_lon() + maxLon = computeOptions.get_max_lon() + ds = computeOptions.get_dataset()[0] + startTime = computeOptions.get_start_time() + endTime = computeOptions.get_end_time() + # TODO update to expect tile objects back + res = [tile.get_summary() for tile in + self._tile_service.find_tiles_in_box(minLat, maxLat, minLon, maxLon, ds, startTime, endTime, + fetch_data=False)] + + res = NexusResults(results=res) + res.extendMeta(minLat, maxLat, minLon, maxLon, ds, startTime, endTime) + return res http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeAvgMap.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/TimeAvgMap.py b/analysis/webservice/algorithms/TimeAvgMap.py new file mode 100644 index 0000000..5c76604 --- /dev/null +++ b/analysis/webservice/algorithms/TimeAvgMap.py @@ -0,0 +1,265 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +# distutils: include_dirs = /usr/local/lib/python2.7/site-packages/cassandra +import pyximport + +pyximport.install() + +import sys +import numpy as np +from time import time +from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusResults, NoDataException +from netCDF4 import Dataset + +#from mpl_toolkits.basemap import Basemap + + +# @nexus_handler +class TimeAvgMapHandlerImpl(NexusHandler): + + name = "Time Average Map" + path = "/timeAvgMap" + description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + NexusHandler.__init__(self, skipCassandra=False) + + def _find_native_resolution(self): + # Get a quick set of tiles (1 degree at center of box) at 1 time stamp + midLat = (self._minLat+self._maxLat)/2 + midLon = (self._minLon+self._maxLon)/2 + ntiles = 0 + t = self._endTime + t_incr = 86400 + while ntiles == 0: + nexus_tiles = self._tile_service.get_tiles_bounded_by_box(midLat-0.5, midLat+0.5, midLon-0.5, midLon+0.5, ds=self._ds, start_time=t-t_incr, end_time=t) + ntiles = len(nexus_tiles) + print 'find_native_res: got %d tiles' % len(nexus_tiles) + sys.stdout.flush() + lat_res = 0. + lon_res = 0. + if ntiles > 0: + for tile in nexus_tiles: + if lat_res < 1e-10: + lats = tile.latitudes.compressed() + if (len(lats) > 1): + lat_res = lats[1] - lats[0] + if lon_res < 1e-10: + lons = tile.longitudes.compressed() + if (len(lons) > 1): + lon_res = lons[1] - lons[0] + if (lat_res >= 1e-10) and (lon_res >= 1e-10): + break + if (lat_res < 1e-10) or (lon_res < 1e-10): + t -= t_incr + + self._latRes = lat_res + self._lonRes = lon_res + + def _find_global_tile_set(self): + ntiles = 0 + t = self._endTime + t_incr = 86400 + while ntiles == 0: + nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=self._ds, start_time=t-t_incr, end_time=t) + ntiles = len(nexus_tiles) + print 'find_global_tile_set got %d tiles' % ntiles + sys.stdout.flush() + t -= t_incr + return nexus_tiles + + def _prune_tiles(self, nexus_tiles): + del_ind = np.where([np.all(tile.data.mask) for tile in nexus_tiles])[0] + for i in np.flipud(del_ind): + del nexus_tiles[i] + + #@staticmethod + #def _map(tile_in): + def _map(self, tile_in): + print 'Started tile %s' % tile_in.section_spec + print 'tile lats = ', tile_in.latitudes + print 'tile lons = ', tile_in.longitudes + print 'tile = ', tile_in.data + sys.stdout.flush() + lats = tile_in.latitudes + lons = tile_in.longitudes + if len(lats > 0) and len(lons > 0): + min_lat = np.ma.min(lats) + max_lat = np.ma.max(lats) + min_lon = np.ma.min(lons) + max_lon = np.ma.max(lons) + good_inds_lat = np.where(lats.mask == False) + good_inds_lon = np.where(lons.mask == False) + min_y = np.min(good_inds_lat) + max_y = np.max(good_inds_lat) + min_x = np.min(good_inds_lon) + max_x = np.max(good_inds_lon) + tile_inbounds_shape = (max_y-min_y+1, max_x-min_x+1) + days_at_a_time = 90 + t_incr = 86400 * days_at_a_time + avg_tile = np.ma.array(np.zeros(tile_inbounds_shape, + dtype=np.float64)) + cnt_tile = np.ma.array(np.zeros(tile_inbounds_shape, + dtype=np.uint32)) + t_start = self._startTime + while t_start <= self._endTime: + t_end = min(t_start+t_incr,self._endTime) + t1 = time() + print 'nexus call start at time %f' % t1 + sys.stdout.flush() + nexus_tiles = self._tile_service.get_tiles_bounded_by_box(min_lat-self._latRes/2, max_lat+self._latRes/2, min_lon-self._lonRes/2, max_lon+self._lonRes/2, ds=self._ds, start_time=t_start, end_time=t_end) + t2 = time() + print 'nexus call end at time %f' % t2 + print 'secs in nexus call: ', t2-t1 + sys.stdout.flush() + self._prune_tiles(nexus_tiles) + print 't %d to %d - Got %d tiles' % (t_start, t_end, + len(nexus_tiles)) + sys.stdout.flush() + for tile in nexus_tiles: + tile.data.data[:,:] = np.nan_to_num(tile.data.data) + avg_tile.data[:,:] += tile.data[0, + min_y:max_y+1, + min_x:max_x+1] + cnt_tile.data[:,:] += (~tile.data.mask[0, + min_y:max_y+1, + min_x:max_x+1]).astype(np.uint8) + t_start = t_end + 1 + + print 'cnt_tile = ', cnt_tile + cnt_tile.mask = ~(cnt_tile.data.astype(bool)) + avg_tile.mask = cnt_tile.mask + avg_tile /= cnt_tile + print 'Finished tile %s' % tile_in.section_spec + print 'Tile avg = ', avg_tile + sys.stdout.flush() + else: + avg_tile = None + min_lat = None + max_lat = None + min_lon = None + max_lon = None + print 'Tile %s outside of bounding box' % tile_in.section_spec + sys.stdout.flush() + return (avg_tile,min_lat,max_lat,min_lon,max_lon) + + def _lat2ind(self,lat): + return int((lat-self._minLatCent)/self._latRes) + + def _lon2ind(self,lon): + return int((lon-self._minLonCent)/self._lonRes) + + def _create_nc_file(self, a): + print 'a=',a + print 'shape a = ', a.shape + sys.stdout.flush() + lat_dim, lon_dim = a.shape + rootgrp = Dataset("tam.nc", "w", format="NETCDF4") + rootgrp.createDimension("lat", lat_dim) + rootgrp.createDimension("lon", lon_dim) + rootgrp.createVariable("TRMM_3B42_daily_precipitation_V7", "f4", + dimensions=("lat","lon",)) + rootgrp.createVariable("lat", "f4", dimensions=("lat",)) + rootgrp.createVariable("lon", "f4", dimensions=("lon",)) + rootgrp.variables["TRMM_3B42_daily_precipitation_V7"][:,:] = a + rootgrp.variables["lat"][:] = np.linspace(self._minLatCent, + self._maxLatCent, lat_dim) + rootgrp.variables["lon"][:] = np.linspace(self._minLonCent, + self._maxLonCent, lon_dim) + rootgrp.close() + + def calc(self, computeOptions, **args): + """ + + :param computeOptions: StatsComputeOptions + :param args: dict + :return: + """ + + self._minLat = float(computeOptions.get_min_lat()) + self._maxLat = float(computeOptions.get_max_lat()) + self._minLon = float(computeOptions.get_min_lon()) + self._maxLon = float(computeOptions.get_max_lon()) + self._ds = computeOptions.get_dataset()[0] + self._startTime = computeOptions.get_start_time() + self._endTime = computeOptions.get_end_time() + + self._find_native_resolution() + print 'Using Native resolution: lat_res=%f, lon_res=%f' % (self._latRes, self._lonRes) + self._minLatCent = self._minLat + self._latRes / 2 + self._minLonCent = self._minLon + self._lonRes / 2 + nlats = int((self._maxLat-self._minLatCent)/self._latRes)+1 + nlons = int((self._maxLon-self._minLonCent)/self._lonRes)+1 + self._maxLatCent = self._minLatCent + (nlats-1) * self._latRes + self._maxLonCent = self._minLonCent + (nlons-1) * self._lonRes + print 'nlats=',nlats,'nlons=',nlons + print 'center lat range = %f to %f' % (self._minLatCent, + self._maxLatCent) + print 'center lon range = %f to %f' % (self._minLonCent, + self._maxLonCent) + sys.stdout.flush() + a = np.zeros((nlats, nlons),dtype=np.float64,order='C') + + nexus_tiles = self._find_global_tile_set() + # print 'tiles:' + # for tile in nexus_tiles: + # print tile.granule + # print tile.section_spec + # print 'lat:', tile.latitudes + # print 'lon:', tile.longitudes + + # nexus_tiles) + if len(nexus_tiles) == 0: + raise NoDataException(reason="No data found for selected timeframe") + + print 'Initially found %d tiles' % len(nexus_tiles) + sys.stdout.flush() + self._prune_tiles(nexus_tiles) + print 'Pruned to %d tiles' % len(nexus_tiles) + sys.stdout.flush() + #for tile in nexus_tiles: + # print 'lats: ', tile.latitudes.compressed() + # print 'lons: ', tile.longitudes.compressed() + + avg_tiles = map(self._map, nexus_tiles) + print 'shape a = ', a.shape + sys.stdout.flush() + # The tiles below are NOT Nexus objects. They are tuples + # with the time avg map data and lat-lon bounding box. + for tile in avg_tiles: + if tile is not None: + (tile_data, tile_min_lat, tile_max_lat, + tile_min_lon, tile_max_lon) = tile + print 'shape tile_data = ', tile_data.shape + print 'tile data mask = ', tile_data.mask + sys.stdout.flush() + if np.any(np.logical_not(tile_data.mask)): + y0 = self._lat2ind(tile_min_lat) + y1 = self._lat2ind(tile_max_lat) + x0 = self._lon2ind(tile_min_lon) + x1 = self._lon2ind(tile_max_lon) + print 'writing tile lat %f-%f, lon %f-%f, map y %d-%d, map x %d-%d' % \ + (tile_min_lat, tile_max_lat, + tile_min_lon, tile_max_lon, y0, y1, x0, x1) + sys.stdout.flush() + a[y0:y1+1,x0:x1+1] = tile_data + else: + print 'All pixels masked in tile lat %f-%f, lon %f-%f, map y %d-%d, map x %d-%d' % \ + (tile_min_lat, tile_max_lat, + tile_min_lon, tile_max_lon, y0, y1, x0, x1) + sys.stdout.flush() + + self._create_nc_file(a) + + return TimeAvgMapResults(results={}, meta={}, computeOptions=computeOptions) + + +class TimeAvgMapResults(NexusResults): + + def __init__(self, results=None, meta=None, computeOptions=None): + NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeSeries.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py new file mode 100644 index 0000000..858fea4 --- /dev/null +++ b/analysis/webservice/algorithms/TimeSeries.py @@ -0,0 +1,549 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import calendar +import logging +import traceback +from cStringIO import StringIO +from datetime import datetime +from multiprocessing.dummy import Pool, Manager + +import matplotlib.dates as mdates +import matplotlib.pyplot as plt +import numpy as np +import pytz +import shapely.geometry +import shapely.wkt +from backports.functools_lru_cache import lru_cache +from nexustiles.nexustiles import NexusTileService +from pytz import timezone +from scipy import stats + +from webservice import Filtering as filtering +from webservice.NexusHandler import NexusHandler, nexus_handler +from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException + +SENTINEL = 'STOP' +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +@nexus_handler +class TimeSeriesHandlerImpl(NexusHandler): + name = "Time Series" + path = "/stats" + description = "Computes a time series plot between one or more datasets given an arbitrary geographical area and time range" + params = { + "ds": { + "name": "Dataset", + "type": "comma-delimited string", + "description": "The dataset(s) Used to generate the Time Series. Required" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "b": { + "name": "Bounding box", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" + }, + "seasonalFilter": { + "name": "Compute Seasonal Cycle Filter", + "type": "boolean", + "description": "Flag used to specify if the seasonal averages should be computed during " + "Time Series computation. Optional (Default: True)" + }, + "lowPassFilter": { + "name": "Compute Low Pass Filter", + "type": "boolean", + "description": "Flag used to specify if a low pass filter should be computed during " + "Time Series computation. Optional (Default: True)" + } + } + singleton = True + + def __init__(self): + NexusHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + + try: + ds = request.get_dataset() + if type(ds) != list and type(ds) != tuple: + ds = (ds,) + except: + raise NexusProcessingException( + reason="'ds' argument is required. Must be comma-delimited string", + code=400) + + # Do not allow time series on Climatology + if next(iter([clim for clim in ds if 'CLIM' in clim]), False): + raise NexusProcessingException(reason="Cannot compute time series on a climatology", code=400) + + try: + bounding_polygon = request.get_bounding_polygon() + request.get_min_lon = lambda: bounding_polygon.bounds[0] + request.get_min_lat = lambda: bounding_polygon.bounds[1] + request.get_max_lon = lambda: bounding_polygon.bounds[2] + request.get_max_lat = lambda: bounding_polygon.bounds[3] + except: + try: + west, south, east, north = request.get_min_lon(), request.get_min_lat(), \ + request.get_max_lon(), request.get_max_lat() + bounding_polygon = shapely.geometry.Polygon( + [(west, south), (east, south), (east, north), (west, north), (west, south)]) + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as " + "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + + try: + start_time = request.get_start_datetime() + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or " + "string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or " + "string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + apply_seasonal_cycle_filter = request.get_apply_seasonal_cycle_filter() + apply_low_pass_filter = request.get_apply_low_pass_filter() + + start_seconds_from_epoch = long((start_time - EPOCH).total_seconds()) + end_seconds_from_epoch = long((end_time - EPOCH).total_seconds()) + + return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \ + apply_seasonal_cycle_filter, apply_low_pass_filter + + def calc(self, request, **args): + """ + + :param request: StatsComputeOptions + :param args: dict + :return: + """ + + ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \ + apply_seasonal_cycle_filter, apply_low_pass_filter = self.parse_arguments(request) + + resultsRaw = [] + + for shortName in ds: + results, meta = self.getTimeSeriesStatsForBoxSingleDataSet(bounding_polygon, + shortName, + start_seconds_from_epoch, + end_seconds_from_epoch, + apply_seasonal_cycle_filter=apply_seasonal_cycle_filter, + apply_low_pass_filter=apply_low_pass_filter) + resultsRaw.append([results, meta]) + + the_time = datetime.now() + results = self._mergeResults(resultsRaw) + + if len(ds) == 2: + try: + stats = TimeSeriesHandlerImpl.calculate_comparison_stats(results) + except Exception: + stats = {} + tb = traceback.format_exc() + self.log.warn("Error when calculating comparison stats:\n%s" % tb) + else: + stats = {} + + meta = [] + for singleRes in resultsRaw: + meta.append(singleRes[1]) + + res = TimeSeriesResults(results=results, meta=meta, stats=stats, + computeOptions=None, minLat=bounding_polygon.bounds[1], + maxLat=bounding_polygon.bounds[3], minLon=bounding_polygon.bounds[0], + maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch, + endTime=end_seconds_from_epoch) + + self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time))) + return res + + def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch, + end_seconds_from_epoch, + apply_seasonal_cycle_filter=True, apply_low_pass_filter=True): + + the_time = datetime.now() + daysinrange = self._tile_service.find_days_in_range_asc(bounding_polygon.bounds[1], + bounding_polygon.bounds[3], + bounding_polygon.bounds[0], + bounding_polygon.bounds[2], + ds, + start_seconds_from_epoch, + end_seconds_from_epoch) + self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + + if len(daysinrange) == 0: + raise NoDataException(reason="No data found for selected timeframe") + + the_time = datetime.now() + maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses")) + + results = [] + if maxprocesses == 1: + calculator = TimeSeriesCalculator() + for dayinseconds in daysinrange: + result = calculator.calc_average_on_day(bounding_polygon.wkt, ds, dayinseconds) + results += [result] if result else [] + else: + # Create a task to calc average difference for each day + manager = Manager() + work_queue = manager.Queue() + done_queue = manager.Queue() + for dayinseconds in daysinrange: + work_queue.put( + ('calc_average_on_day', bounding_polygon.wkt, ds, dayinseconds)) + [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)] + + # Start new processes to handle the work + pool = Pool(maxprocesses) + [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)] + pool.close() + + # Collect the results as [(day (in ms), average difference for that day)] + for i in xrange(0, len(daysinrange)): + result = done_queue.get() + try: + error_str = result['error'] + self.log.error(error_str) + raise NexusProcessingException(reason="Error calculating average by day.") + except KeyError: + pass + + results += [result] if result else [] + + pool.terminate() + manager.shutdown() + + results = sorted(results, key=lambda entry: entry["time"]) + self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + + if apply_seasonal_cycle_filter: + the_time = datetime.now() + for result in results: + month = datetime.utcfromtimestamp(result['time']).month + month_mean, month_max, month_min = self.calculate_monthly_average(month, bounding_polygon.wkt, ds) + seasonal_mean = result['mean'] - month_mean + seasonal_min = result['min'] - month_min + seasonal_max = result['max'] - month_max + result['meanSeasonal'] = seasonal_mean + result['minSeasonal'] = seasonal_min + result['maxSeasonal'] = seasonal_max + self.log.info( + "Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + + the_time = datetime.now() + filtering.applyAllFiltersOnField(results, 'mean', applySeasonal=False, applyLowPass=apply_low_pass_filter) + filtering.applyAllFiltersOnField(results, 'max', applySeasonal=False, applyLowPass=apply_low_pass_filter) + filtering.applyAllFiltersOnField(results, 'min', applySeasonal=False, applyLowPass=apply_low_pass_filter) + + if apply_seasonal_cycle_filter and apply_low_pass_filter: + try: + filtering.applyFiltersOnField(results, 'meanSeasonal', applySeasonal=False, applyLowPass=True, + append="LowPass") + filtering.applyFiltersOnField(results, 'minSeasonal', applySeasonal=False, applyLowPass=True, + append="LowPass") + filtering.applyFiltersOnField(results, 'maxSeasonal', applySeasonal=False, applyLowPass=True, + append="LowPass") + except Exception as e: + # If it doesn't work log the error but ignore it + tb = traceback.format_exc() + self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) + + self.log.info( + "LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + + return results, {} + + @lru_cache() + def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, ds=None): + + min_date, max_date = self.get_min_max_date(ds=ds) + + monthly_averages, monthly_counts = [], [] + monthly_mins, monthly_maxes = [], [] + bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt) + for year in range(min_date.year, max_date.year + 1): + beginning_of_month = datetime(year, month, 1) + end_of_month = datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59) + start = (pytz.UTC.localize(beginning_of_month) - EPOCH).total_seconds() + end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds() + tile_stats = self._tile_service.find_tiles_in_polygon(bounding_polygon, ds, start, end, + fl=('id,' + 'tile_avg_val_d,tile_count_i,' + 'tile_min_val_d,tile_max_val_d,' + 'tile_min_lat,tile_max_lat,' + 'tile_min_lon,tile_max_lon'), + fetch_data=False) + if len(tile_stats) == 0: + continue + + # Split list into tiles on the border of the bounding box and tiles completely inside the bounding box. + border_tiles, inner_tiles = [], [] + for tile in tile_stats: + inner_tiles.append(tile) if bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon, + tile.bbox.min_lat, + tile.bbox.max_lon, + tile.bbox.max_lat)) else border_tiles.append( + tile) + + # We can use the stats of the inner tiles directly + tile_means = [tile.tile_stats.mean for tile in inner_tiles] + tile_mins = [tile.tile_stats.min for tile in inner_tiles] + tile_maxes = [tile.tile_stats.max for tile in inner_tiles] + tile_counts = [tile.tile_stats.count for tile in inner_tiles] + + # Border tiles need have the data loaded, masked, and stats recalculated + border_tiles = list(self._tile_service.fetch_data_for_tiles(*border_tiles)) + border_tiles = self._tile_service.mask_tiles_to_polygon(bounding_polygon, border_tiles) + for tile in border_tiles: + tile.update_stats() + tile_means.append(tile.tile_stats.mean) + tile_mins.append(tile.tile_stats.min) + tile_maxes.append(tile.tile_stats.max) + tile_counts.append(tile.tile_stats.count) + + tile_means = np.array(tile_means) + tile_mins = np.array(tile_mins) + tile_maxes = np.array(tile_maxes) + tile_counts = np.array(tile_counts) + + sum_tile_counts = np.sum(tile_counts) * 1.0 + + monthly_averages += [np.average(tile_means, None, tile_counts / sum_tile_counts).item()] + monthly_mins += [np.average(tile_mins, None, tile_counts / sum_tile_counts).item()] + monthly_maxes += [np.average(tile_maxes, None, tile_counts / sum_tile_counts).item()] + monthly_counts += [sum_tile_counts] + + count_sum = np.sum(monthly_counts) * 1.0 + weights = np.array(monthly_counts) / count_sum + + return np.average(monthly_averages, None, weights).item(), \ + np.average(monthly_averages, None, weights).item(), \ + np.average(monthly_averages, None, weights).item() + + @lru_cache() + def get_min_max_date(self, ds=None): + min_date = pytz.timezone('UTC').localize( + datetime.utcfromtimestamp(self._tile_service.get_min_time([], ds=ds))) + max_date = pytz.timezone('UTC').localize( + datetime.utcfromtimestamp(self._tile_service.get_max_time([], ds=ds))) + + return min_date.date(), max_date.date() + + @staticmethod + def calculate_comparison_stats(results): + xy = [[], []] + + for item in results: + if len(item) == 2: + xy[item[0]["ds"]].append(item[0]["mean"]) + xy[item[1]["ds"]].append(item[1]["mean"]) + + slope, intercept, r_value, p_value, std_err = stats.linregress(xy[0], xy[1]) + comparisonStats = { + "slope": slope, + "intercept": intercept, + "r": r_value, + "p": p_value, + "err": std_err + } + + return comparisonStats + + +class TimeSeriesResults(NexusResults): + LINE_PLOT = "line" + SCATTER_PLOT = "scatter" + + __SERIES_COLORS = ['red', 'blue'] + + def toImage(self): + + type = self.computeOptions().get_plot_type() + + if type == TimeSeriesResults.LINE_PLOT or type == "default": + return self.createLinePlot() + elif type == TimeSeriesResults.SCATTER_PLOT: + return self.createScatterPlot() + else: + raise Exception("Invalid or unsupported time series plot specified") + + def createScatterPlot(self): + timeSeries = [] + series0 = [] + series1 = [] + + res = self.results() + meta = self.meta() + + plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "mean" + + for m in res: + if len(m) == 2: + timeSeries.append(datetime.fromtimestamp(m[0]["time"] / 1000)) + series0.append(m[0][plotSeries]) + series1.append(m[1][plotSeries]) + + title = ', '.join(set([m['title'] for m in meta])) + sources = ', '.join(set([m['source'] for m in meta])) + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + fig, ax = plt.subplots() + fig.set_size_inches(11.0, 8.5) + ax.scatter(series0, series1, alpha=0.5) + ax.set_xlabel(meta[0]['units']) + ax.set_ylabel(meta[1]['units']) + ax.set_title("%s\n%s\n%s" % (title, sources, dateRange)) + + par = np.polyfit(series0, series1, 1, full=True) + slope = par[0][0] + intercept = par[0][1] + xl = [min(series0), max(series0)] + yl = [slope * xx + intercept for xx in xl] + plt.plot(xl, yl, '-r') + + ax.grid(True) + fig.tight_layout() + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + def createLinePlot(self): + nseries = len(self.meta()) + res = self.results() + meta = self.meta() + + timeSeries = [datetime.fromtimestamp(m[0]["time"] / 1000) for m in res] + + means = [[np.nan] * len(res) for n in range(0, nseries)] + + plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "mean" + + for n in range(0, len(res)): + timeSlot = res[n] + for seriesValues in timeSlot: + means[seriesValues['ds']][n] = seriesValues[plotSeries] + + x = timeSeries + + fig, axMain = plt.subplots() + fig.set_size_inches(11.0, 8.5) + fig.autofmt_xdate() + + title = ', '.join(set([m['title'] for m in meta])) + sources = ', '.join(set([m['source'] for m in meta])) + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange)) + axMain.set_xlabel('Date') + axMain.grid(True) + axMain.xaxis.set_major_locator(mdates.YearLocator()) + axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y')) + axMain.xaxis.set_minor_locator(mdates.MonthLocator()) + axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d') + + plots = [] + + for n in range(0, nseries): + if n == 0: + ax = axMain + else: + ax = ax.twinx() + + plots += ax.plot(x, means[n], color=self.__SERIES_COLORS[n], zorder=10, linewidth=3, label=meta[n]['title']) + ax.set_ylabel(meta[n]['units']) + + labs = [l.get_label() for l in plots] + axMain.legend(plots, labs, loc=0) + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +class TimeSeriesCalculator(object): + def __init__(self): + self.__tile_service = NexusTileService() + + def calc_average_on_day(self, bounding_polygon_wkt, dataset, timeinseconds): + bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt) + ds1_nexus_tiles = self.__tile_service.get_tiles_bounded_by_polygon_at_time(bounding_polygon, + dataset, + timeinseconds) + + # If all data ends up getting masked, ds1_nexus_tiles will be empty + if len(ds1_nexus_tiles) == 0: + return {} + + tile_data_agg = np.ma.array([tile.data for tile in ds1_nexus_tiles]) + data_min = np.ma.min(tile_data_agg) + data_max = np.ma.max(tile_data_agg) + daily_mean = np.ma.mean(tile_data_agg).item() + data_count = np.ma.count(tile_data_agg) + try: + data_count = data_count.item() + except AttributeError: + pass + data_std = np.ma.std(tile_data_agg) + + # Return Stats by day + stat = { + 'min': data_min, + 'max': data_max, + 'mean': daily_mean, + 'cnt': data_count, + 'std': data_std, + 'time': int(timeinseconds) + } + return stat + + +def pool_worker(work_queue, done_queue): + try: + calculator = TimeSeriesCalculator() + + for work in iter(work_queue.get, SENTINEL): + scifunction = work[0] + args = work[1:] + result = calculator.__getattribute__(scifunction)(*args) + done_queue.put(result) + + except Exception as e: + e_str = traceback.format_exc(e) + done_queue.put({'error': e_str}) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/TimeSeriesSolr.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/TimeSeriesSolr.py b/analysis/webservice/algorithms/TimeSeriesSolr.py new file mode 100644 index 0000000..1da55b2 --- /dev/null +++ b/analysis/webservice/algorithms/TimeSeriesSolr.py @@ -0,0 +1,342 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import sys +import traceback +import logging +from cStringIO import StringIO +from datetime import datetime +from multiprocessing.dummy import Pool, Manager + +import matplotlib.dates as mdates +import matplotlib.pyplot as plt +import numpy as np +from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC +from nexustiles.nexustiles import NexusTileService +from scipy import stats + +from webservice import Filtering as filt +from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException + +SENTINEL = 'STOP' + + +@nexus_handler +class TimeSeriesHandlerImpl(NexusHandler): + name = "Time Series Solr" + path = "/statsSolr" + description = "Computes a time series plot between one or more datasets given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + NexusHandler.__init__(self, skipCassandra=True) + self.log = logging.getLogger(__name__) + + def calc(self, computeOptions, **args): + """ + + :param computeOptions: StatsComputeOptions + :param args: dict + :return: + """ + + ds = computeOptions.get_dataset() + + if type(ds) != list and type(ds) != tuple: + ds = (ds,) + + resultsRaw = [] + + for shortName in ds: + results, meta = self.getTimeSeriesStatsForBoxSingleDataSet(computeOptions.get_min_lat(), + computeOptions.get_max_lat(), + computeOptions.get_min_lon(), + computeOptions.get_max_lon(), + shortName, + computeOptions.get_start_time(), + computeOptions.get_end_time(), + computeOptions.get_apply_seasonal_cycle_filter(), + computeOptions.get_apply_low_pass_filter()) + resultsRaw.append([results, meta]) + + results = self._mergeResults(resultsRaw) + + if len(ds) == 2: + stats = self.calculateComparisonStats(results, suffix="") + if computeOptions.get_apply_seasonal_cycle_filter(): + s = self.calculateComparisonStats(results, suffix="Seasonal") + stats = self._mergeDicts(stats, s) + if computeOptions.get_apply_low_pass_filter(): + s = self.calculateComparisonStats(results, suffix="LowPass") + stats = self._mergeDicts(stats, s) + if computeOptions.get_apply_seasonal_cycle_filter() and computeOptions.get_apply_low_pass_filter(): + s = self.calculateComparisonStats(results, suffix="SeasonalLowPass") + stats = self._mergeDicts(stats, s) + else: + stats = {} + + meta = [] + for singleRes in resultsRaw: + meta.append(singleRes[1]) + + res = TimeSeriesResults(results=results, meta=meta, stats=stats, computeOptions=computeOptions) + return res + + def getTimeSeriesStatsForBoxSingleDataSet(self, min_lat, max_lat, min_lon, max_lon, ds, start_time=0, end_time=-1, + applySeasonalFilter=True, applyLowPass=True): + + daysinrange = self._tile_service.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, ds, start_time, + end_time) + + if len(daysinrange) == 0: + raise NoDataException(reason="No data found for selected timeframe") + + maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses")) + + results = [] + if maxprocesses == 1: + calculator = TimeSeriesCalculator() + for dayinseconds in daysinrange: + result = calculator.calc_average_on_day(min_lat, max_lat, min_lon, max_lon, ds, dayinseconds) + results.append(result) + else: + # Create a task to calc average difference for each day + manager = Manager() + work_queue = manager.Queue() + done_queue = manager.Queue() + for dayinseconds in daysinrange: + work_queue.put( + ('calc_average_on_day', min_lat, max_lat, min_lon, max_lon, ds, dayinseconds)) + [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)] + + # Start new processes to handle the work + pool = Pool(maxprocesses) + [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)] + pool.close() + + # Collect the results as [(day (in ms), average difference for that day)] + for i in xrange(0, len(daysinrange)): + result = done_queue.get() + try: + error_str = result['error'] + self.log.error(error_str) + raise NexusProcessingException(reason="Error calculating average by day.") + except KeyError: + pass + + results.append(result) + + pool.terminate() + manager.shutdown() + + results = sorted(results, key=lambda entry: entry["time"]) + + filt.applyAllFiltersOnField(results, 'mean', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass) + filt.applyAllFiltersOnField(results, 'max', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass) + filt.applyAllFiltersOnField(results, 'min', applySeasonal=applySeasonalFilter, applyLowPass=applyLowPass) + + return results, {} + + def calculateComparisonStats(self, results, suffix=""): + + xy = [[], []] + + for item in results: + if len(item) == 2: + xy[item[0]["ds"]].append(item[0]["mean%s" % suffix]) + xy[item[1]["ds"]].append(item[1]["mean%s" % suffix]) + + slope, intercept, r_value, p_value, std_err = stats.linregress(xy[0], xy[1]) + comparisonStats = { + "slope%s" % suffix: slope, + "intercept%s" % suffix: intercept, + "r%s" % suffix: r_value, + "p%s" % suffix: p_value, + "err%s" % suffix: std_err + } + + return comparisonStats + + +class TimeSeriesResults(NexusResults): + LINE_PLOT = "line" + SCATTER_PLOT = "scatter" + + __SERIES_COLORS = ['red', 'blue'] + + def __init__(self, results=None, meta=None, stats=None, computeOptions=None): + NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions) + + def toImage(self): + + type = self.computeOptions().get_plot_type() + + if type == TimeSeriesResults.LINE_PLOT or type == "default": + return self.createLinePlot() + elif type == TimeSeriesResults.SCATTER_PLOT: + return self.createScatterPlot() + else: + raise Exception("Invalid or unsupported time series plot specified") + + def createScatterPlot(self): + timeSeries = [] + series0 = [] + series1 = [] + + res = self.results() + meta = self.meta() + + plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "mean" + + for m in res: + if len(m) == 2: + timeSeries.append(datetime.fromtimestamp(m[0]["time"] / 1000)) + series0.append(m[0][plotSeries]) + series1.append(m[1][plotSeries]) + + title = ', '.join(set([m['title'] for m in meta])) + sources = ', '.join(set([m['source'] for m in meta])) + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + fig, ax = plt.subplots() + fig.set_size_inches(11.0, 8.5) + ax.scatter(series0, series1, alpha=0.5) + ax.set_xlabel(meta[0]['units']) + ax.set_ylabel(meta[1]['units']) + ax.set_title("%s\n%s\n%s" % (title, sources, dateRange)) + + par = np.polyfit(series0, series1, 1, full=True) + slope = par[0][0] + intercept = par[0][1] + xl = [min(series0), max(series0)] + yl = [slope * xx + intercept for xx in xl] + plt.plot(xl, yl, '-r') + + # r = self.stats()["r"] + # plt.text(0.5, 0.5, "r = foo") + + ax.grid(True) + fig.tight_layout() + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + def createLinePlot(self): + nseries = len(self.meta()) + res = self.results() + meta = self.meta() + + timeSeries = [datetime.fromtimestamp(m[0]["time"] / 1000) for m in res] + + means = [[np.nan] * len(res) for n in range(0, nseries)] + + plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "mean" + + for n in range(0, len(res)): + timeSlot = res[n] + for seriesValues in timeSlot: + means[seriesValues['ds']][n] = seriesValues[plotSeries] + + x = timeSeries + + fig, axMain = plt.subplots() + fig.set_size_inches(11.0, 8.5) + fig.autofmt_xdate() + + title = ', '.join(set([m['title'] for m in meta])) + sources = ', '.join(set([m['source'] for m in meta])) + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange)) + axMain.set_xlabel('Date') + axMain.grid(True) + axMain.xaxis.set_major_locator(mdates.YearLocator()) + axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y')) + axMain.xaxis.set_minor_locator(mdates.MonthLocator()) + axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d') + + plots = [] + + for n in range(0, nseries): + if n == 0: + ax = axMain + else: + ax = ax.twinx() + + plots += ax.plot(x, means[n], color=self.__SERIES_COLORS[n], zorder=10, linewidth=3, label=meta[n]['title']) + ax.set_ylabel(meta[n]['units']) + + labs = [l.get_label() for l in plots] + axMain.legend(plots, labs, loc=0) + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +class TimeSeriesCalculator(object): + def __init__(self): + self.__tile_service = NexusTileService() + + def calc_average_on_day(self, min_lat, max_lat, min_lon, max_lon, dataset, timeinseconds): + # Get stats using solr only + ds1_nexus_tiles_stats = self.__tile_service.get_stats_within_box_at_time(min_lat, max_lat, min_lon, max_lon, + dataset, + timeinseconds) + + data_min_within = min([tile["tile_min_val_d"] for tile in ds1_nexus_tiles_stats]) + data_max_within = max([tile["tile_max_val_d"] for tile in ds1_nexus_tiles_stats]) + data_sum_within = sum([tile["product(tile_avg_val_d, tile_count_i)"] for tile in ds1_nexus_tiles_stats]) + data_count_within = sum([tile["tile_count_i"] for tile in ds1_nexus_tiles_stats]) + + # Get boundary tiles and calculate stats + ds1_nexus_tiles = self.__tile_service.get_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, + dataset, + timeinseconds) + + tile_data_agg = np.ma.array([tile.data for tile in ds1_nexus_tiles]) + data_min_boundary = np.ma.min(tile_data_agg) + data_max_boundary = np.ma.max(tile_data_agg) + #daily_mean = np.ma.mean(tile_data_agg).item() + data_sum_boundary = np.ma.sum(tile_data_agg) + data_count_boundary = np.ma.count(tile_data_agg).item() + #data_std = np.ma.std(tile_data_agg) + + # Combine stats + data_min = min(data_min_within, data_min_boundary) + data_max = max(data_max_within, data_max_boundary) + data_count = data_count_within + data_count_boundary + daily_mean = (data_sum_within + data_sum_boundary) / data_count + data_std = 0 + + # Return Stats by day + stat = { + 'min': data_min, + 'max': data_max, + 'mean': daily_mean, + 'cnt': data_count, + 'std': data_std, + 'time': int(timeinseconds) + } + return stat + + +def pool_worker(work_queue, done_queue): + try: + calculator = TimeSeriesCalculator() + + for work in iter(work_queue.get, SENTINEL): + scifunction = work[0] + args = work[1:] + result = calculator.__getattribute__(scifunction)(*args) + done_queue.put(result) + except Exception as e: + e_str = traceback.format_exc(e) + done_queue.put({'error': e_str}) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/__init__.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/__init__.py b/analysis/webservice/algorithms/__init__.py new file mode 100644 index 0000000..fb5ecc8 --- /dev/null +++ b/analysis/webservice/algorithms/__init__.py @@ -0,0 +1,20 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import Capabilities +import CorrelationMap +import DailyDifferenceAverage +import DataInBoundsSearch +import DataSeriesList +import DelayTest +import ErrorTosserTest +import Heartbeat +import HofMoeller +import LongitudeLatitudeMap +import TestInitializer +import TileSearch +import TimeAvgMap +import TimeSeries +import TimeSeriesSolr +import StandardDeviationSearch \ No newline at end of file
