http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/tests/algorithms_spark/__init__.py ---------------------------------------------------------------------- diff --git a/analysis/tests/algorithms_spark/__init__.py b/analysis/tests/algorithms_spark/__init__.py new file mode 100644 index 0000000..bd9282c --- /dev/null +++ b/analysis/tests/algorithms_spark/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/Filtering.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/Filtering.py b/analysis/webservice/Filtering.py new file mode 100644 index 0000000..08b976e --- /dev/null +++ b/analysis/webservice/Filtering.py @@ -0,0 +1,157 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" + +import math + +import logging +import traceback + +import numpy as np +from scipy import stats +from scipy.fftpack import fft +from scipy.ndimage.interpolation import zoom +from scipy.interpolate import UnivariateSpline +from scipy.signal import wiener, filtfilt, butter, gaussian, freqz +from scipy.ndimage import filters + +log = logging.getLogger('Filtering') + + +def __fieldToList(results, field): + a = np.zeros(len(results)) + for n in range(0, len(results)): + a[n] = results[n][field] + return a + + +def __listToField(results, l, field): + if results is None or l is None: + raise Exception("Cannot transpose values if they're null") + + if not len(results) == len(l): + raise Exception("Cannot transpose values between lists of inequal length") + + for n in range(0, len(results)): + results[n][field] = l[n] + + +def applySeasonalCycleFilter1d(l): + if len(l) <= 12: + return l + + for a in range(0, 12): + values = [] + for b in range(a, len(l), 12): + values.append(l[b]) + avg = np.average(values) + for b in range(a, len(l), 12): + l[b] -= avg + return l + + +def applySeasonalCycleFilter2d(l): + return l + + +''' + Implements monthly filtering of seasonal cycles. +''' + + +def applySeasonalCycleFilter(l): + if len(np.shape(l)) == 1: + return applySeasonalCycleFilter1d(l) + elif len(np.shape(l)) == 2: + return applySeasonalCycleFilter2d(l) + else: + raise Exception("Cannot apply seasonal cycle filter: Unsupported array shape") + + +def applySeasonalCycleFilterOnResultsField(results, field): + l = __fieldToList(results, field) + applySeasonalCycleFilter(l) + __listToField(results, l, field) + + +def applySeasonalCycleFilterOnResults(results): + [applySeasonalCycleFilterOnResultsField(results, field) for field in ['mean', 'max', 'min']] + + +''' +http://www.nehalemlabs.net/prototype/blog/2013/04/05/an-introduction-to-smoothing-time-series-in-python-part-i-filtering-theory/ +''' + + +def applyLowPassFilter(y, lowcut=12.0, order=9.0): + if len(y) - 12 <= lowcut: + lowcut = 3 + nyq = 0.5 * len(y) + low = lowcut / nyq + # high = highcut / nyq + b, a = butter(order, low) + m = min([len(y), len(a), len(b)]) + padlen = 30 if m >= 30 else m + fl = filtfilt(b, a, y, padlen=padlen) + return fl + + +def applyFiltersOnField(results, field, applySeasonal=False, applyLowPass=False, append=""): + x = __fieldToList(results, field) + + if applySeasonal: + x = applySeasonalCycleFilter(x) + if applyLowPass: + x = applyLowPassFilter(x) + __listToField(results, x, "%s%s" % (field, append)) + + +def applyAllFiltersOnField(results, field, applySeasonal=True, applyLowPass=True): + try: + if applySeasonal: + applyFiltersOnField(results, field, applySeasonal=True, applyLowPass=False, append="Seasonal") + except Exception as e: + # If it doesn't work log the error but ignore it + tb = traceback.format_exc() + log.warn("Error calculating Seasonal filter:\n%s" % tb) + + try: + if applyLowPass: + applyFiltersOnField(results, field, applySeasonal=False, applyLowPass=True, append="LowPass") + except Exception as e: + # If it doesn't work log the error but ignore it + tb = traceback.format_exc() + log.warn("Error calculating LowPass filter:\n%s" % tb) + + try: + if applySeasonal and applyLowPass: + applyFiltersOnField(results, field, applySeasonal=True, applyLowPass=True, append="SeasonalLowPass") + except Exception as e: + # If it doesn't work log the error but ignore it + tb = traceback.format_exc() + log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) + + +''' +class ResultsFilter(object): + + def __init__(self): + pass + + def filter(self, results, append, **kwargs): + pass + + + +class SeasonalCycleFilter(ResultsFilter): + + def filter(self, results, append, **kwargs): + [applySeasonalCycleFilterOnResultsField(results, field) for field in ['mean', 'max', 'min']] + +if __name__ == "__main__": + + foo = "bar" + f = ResultsFilter() + f.test("Tester", blah=foo) +''' http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/LayerConfig.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/LayerConfig.py b/analysis/webservice/LayerConfig.py new file mode 100644 index 0000000..e271b72 --- /dev/null +++ b/analysis/webservice/LayerConfig.py @@ -0,0 +1,61 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" + + +ALL_LAYERS_ENABLED = True +LAYERS = [] +LAYERS.append({"shortName":"NCDC-L4LRblend-GLOB-AVHRR_OI", "envs": ("ALL",)}) +LAYERS.append({"shortName":"SSH_alti_1deg_1mon", "envs": ("ALL",)}) + + +LAYERS.append({"shortName":"SIacSubl_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"PHIBOT_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"SIhsnow_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"SIheff_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"oceFWflx_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"oceQnet_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"MXLDEPTH_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"SIatmQnt_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"oceSPflx_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"oceSPDep_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"SIarea_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"ETAN_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"sIceLoad_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"oceQsw_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"SIsnPrcp_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"DETADT2_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"TFLUX_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"SItflux_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"SFLUX_ECCO_version4_release1", "envs": ("ALL",)}) +LAYERS.append({"shortName":"TELLUS_GRACE_MASCON_GRID_RL05_V1_LAND", "envs": ("ALL",)}) +LAYERS.append({"shortName":"TELLUS_GRACE_MASCON_GRID_RL05_V1_OCEAN", "envs": ("ALL",)}) +LAYERS.append({"shortName":"Sea_Surface_Anomalies", "envs": ("DEV",)}) + +LAYERS.append({"shortName":"JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1", "envs": ("ALL",)}) + + +def isLayerEnabled(shortName, env): + if ALL_LAYERS_ENABLED: + return True + if env == None: + env = "PROD" + env = env.upper() + if env == "DEV": + return True + for layer in LAYERS: + if layer["shortName"] == shortName and ("ALL" in layer["envs"] or env.upper() in layer["envs"]): + return True + return False + + +if __name__ == "__main__": + + print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", None) + print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", "PROD") + print isLayerEnabled("NCDC-L4LRblend-GLOB-AVHRR_OI", "SIT") + + print isLayerEnabled("TFLUX_ECCO_version4_release1", None) + print isLayerEnabled("TFLUX_ECCO_version4_release1", "PROD") + print isLayerEnabled("TFLUX_ECCO_version4_release1", "SIT") http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/NexusHandler.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/NexusHandler.py b/analysis/webservice/NexusHandler.py new file mode 100644 index 0000000..1cad3cf --- /dev/null +++ b/analysis/webservice/NexusHandler.py @@ -0,0 +1,554 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import sys +import numpy as np +import logging +import time +import types +from datetime import datetime +from netCDF4 import Dataset +from nexustiles.nexustiles import NexusTileService +from webservice.webmodel import NexusProcessingException + +AVAILABLE_HANDLERS = [] +AVAILABLE_INITIALIZERS = [] + + +def nexus_initializer(clazz): + log = logging.getLogger(__name__) + try: + wrapper = NexusInitializerWrapper(clazz) + log.info("Adding initializer '%s'" % wrapper.clazz()) + AVAILABLE_INITIALIZERS.append(wrapper) + except Exception as ex: + log.warn("Initializer '%s' failed to load (reason: %s)" % (clazz, ex.message), exc_info=True) + return clazz + + +def nexus_handler(clazz): + log = logging.getLogger(__name__) + try: + wrapper = AlgorithmModuleWrapper(clazz) + log.info("Adding algorithm module '%s' with path '%s' (%s)" % (wrapper.name(), wrapper.path(), wrapper.clazz())) + AVAILABLE_HANDLERS.append(wrapper) + except Exception as ex: + log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" % (clazz, ex.message), exc_info=True) + return clazz + + +DEFAULT_PARAMETERS_SPEC = { + "ds": { + "name": "Dataset", + "type": "string", + "description": "One or more comma-separated dataset shortnames" + }, + "minLat": { + "name": "Minimum Latitude", + "type": "float", + "description": "Minimum (Southern) bounding box Latitude" + }, + "maxLat": { + "name": "Maximum Latitude", + "type": "float", + "description": "Maximum (Northern) bounding box Latitude" + }, + "minLon": { + "name": "Minimum Longitude", + "type": "float", + "description": "Minimum (Western) bounding box Longitude" + }, + "maxLon": { + "name": "Maximum Longitude", + "type": "float", + "description": "Maximum (Eastern) bounding box Longitude" + }, + "startTime": { + "name": "Start Time", + "type": "long integer", + "description": "Starting time in milliseconds since midnight Jan. 1st, 1970 UTC" + }, + "endTime": { + "name": "End Time", + "type": "long integer", + "description": "Ending time in milliseconds since midnight Jan. 1st, 1970 UTC" + }, + "lowPassFilter": { + "name": "Apply Low Pass Filter", + "type": "boolean", + "description": "Specifies whether to apply a low pass filter on the analytics results" + }, + "seasonalFilter": { + "name": "Apply Seasonal Filter", + "type": "boolean", + "description": "Specified whether to apply a seasonal cycle filter on the analytics results" + } +} + + +class NexusInitializerWrapper: + def __init__(self, clazz): + self.__log = logging.getLogger(__name__) + self.__hasBeenRun = False + self.__clazz = clazz + self.validate() + + def validate(self): + if "init" not in self.__clazz.__dict__ or not type(self.__clazz.__dict__["init"]) == types.FunctionType: + raise Exception("Method 'init' has not been declared") + + def clazz(self): + return self.__clazz + + def hasBeenRun(self): + return self.__hasBeenRun + + def init(self, config): + if not self.__hasBeenRun: + self.__hasBeenRun = True + instance = self.__clazz() + instance.init(config) + else: + self.log("Initializer '%s' has already been run" % self.__clazz) + + +class AlgorithmModuleWrapper: + def __init__(self, clazz): + self.__instance = None + self.__clazz = clazz + self.validate() + + def validate(self): + if "calc" not in self.__clazz.__dict__ or not type(self.__clazz.__dict__["calc"]) == types.FunctionType: + raise Exception("Method 'calc' has not been declared") + + if "path" not in self.__clazz.__dict__: + raise Exception("Property 'path' has not been defined") + + if "name" not in self.__clazz.__dict__: + raise Exception("Property 'name' has not been defined") + + if "description" not in self.__clazz.__dict__: + raise Exception("Property 'description' has not been defined") + + if "params" not in self.__clazz.__dict__: + raise Exception("Property 'params' has not been defined") + + def clazz(self): + return self.__clazz + + def name(self): + return self.__clazz.name + + def path(self): + return self.__clazz.path + + def description(self): + return self.__clazz.description + + def params(self): + return self.__clazz.params + + def instance(self, algorithm_config=None, sc=None): + if "singleton" in self.__clazz.__dict__ and self.__clazz.__dict__["singleton"] is True: + if self.__instance is None: + self.__instance = self.__clazz() + + try: + self.__instance.set_config(algorithm_config) + except AttributeError: + pass + + try: + self.__instance.set_spark_context(sc) + except AttributeError: + pass + + return self.__instance + else: + instance = self.__clazz() + + try: + instance.set_config(algorithm_config) + except AttributeError: + pass + + try: + self.__instance.set_spark_context(sc) + except AttributeError: + pass + return instance + + def isValid(self): + try: + self.validate() + return True + except Exception as ex: + return False + + +class CalcHandler(object): + def calc(self, computeOptions, **args): + raise Exception("calc() not yet implemented") + + +class NexusHandler(CalcHandler): + def __init__(self, skipCassandra=False, skipSolr=False): + CalcHandler.__init__(self) + + self.algorithm_config = None + self._tile_service = NexusTileService(skipCassandra, skipSolr) + + def set_config(self, algorithm_config): + self.algorithm_config = algorithm_config + + def _mergeDicts(self, x, y): + z = x.copy() + z.update(y) + return z + + def _now(self): + millis = int(round(time.time() * 1000)) + return millis + + def _mergeDataSeries(self, resultsData, dataNum, resultsMap): + + for entry in resultsData: + + #frmtdTime = datetime.fromtimestamp(entry["time"] ).strftime("%Y-%m") + frmtdTime = entry["time"] + + if not frmtdTime in resultsMap: + resultsMap[frmtdTime] = [] + entry["ds"] = dataNum + resultsMap[frmtdTime].append(entry) + + def _resultsMapToList(self, resultsMap): + resultsList = [] + for key, value in resultsMap.iteritems(): + resultsList.append(value) + + resultsList = sorted(resultsList, key=lambda entry: entry[0]["time"]) + return resultsList + + def _mergeResults(self, resultsRaw): + resultsMap = {} + + for i in range(0, len(resultsRaw)): + resultsSeries = resultsRaw[i] + resultsData = resultsSeries[0] + self._mergeDataSeries(resultsData, i, resultsMap) + + resultsList = self._resultsMapToList(resultsMap) + return resultsList + + +class SparkHandler(NexusHandler): + class SparkJobContext(object): + + class MaxConcurrentJobsReached(Exception): + def __init__(self, *args, **kwargs): + Exception.__init__(self, *args, **kwargs) + + def __init__(self, job_stack): + self.spark_job_stack = job_stack + self.job_name = None + self.log = logging.getLogger(__name__) + + def __enter__(self): + try: + self.job_name = self.spark_job_stack.pop() + self.log.debug("Using %s" % self.job_name) + except IndexError: + raise SparkHandler.SparkJobContext.MaxConcurrentJobsReached() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.job_name is not None: + self.log.debug("Returning %s" % self.job_name) + self.spark_job_stack.append(self.job_name) + + def __init__(self, **kwargs): + import inspect + NexusHandler.__init__(self, **kwargs) + self._sc = None + + self.spark_job_stack = [] + + def with_spark_job_context(calc_func): + from functools import wraps + + @wraps(calc_func) + def wrapped(*args, **kwargs1): + try: + with SparkHandler.SparkJobContext(self.spark_job_stack) as job_context: + # TODO Pool and Job are forced to a 1-to-1 relationship + calc_func.im_self._sc.setLocalProperty("spark.scheduler.pool", job_context.job_name) + calc_func.im_self._sc.setJobGroup(job_context.job_name, "a spark job") + return calc_func(*args, **kwargs1) + except SparkHandler.SparkJobContext.MaxConcurrentJobsReached: + raise NexusProcessingException(code=503, + reason="Max concurrent requests reached. Please try again later.") + + return wrapped + + for member in inspect.getmembers(self, predicate=inspect.ismethod): + if member[0] == "calc": + setattr(self, member[0], with_spark_job_context(member[1])) + + def set_spark_context(self, sc): + self._sc = sc + + def set_config(self, algorithm_config): + max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section( + "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10 + self.spark_job_stack = list(["Job %s" % x for x in xrange(1, max_concurrent_jobs + 1)]) + self.algorithm_config = algorithm_config + + def _setQueryParams(self, ds, bounds, start_time=None, end_time=None, + start_year=None, end_year=None, clim_month=None, + fill=-9999., spark_master=None, spark_nexecs=None, + spark_nparts=None): + self._ds = ds + self._minLat, self._maxLat, self._minLon, self._maxLon = bounds + self._startTime = start_time + self._endTime = end_time + self._startYear = start_year + self._endYear = end_year + self._climMonth = clim_month + self._fill = fill + self._spark_master = spark_master + self._spark_nexecs = spark_nexecs + self._spark_nparts = spark_nparts + + def _find_global_tile_set(self): + if type(self._ds) in (list,tuple): + ds = self._ds[0] + else: + ds = self._ds + ntiles = 0 + ################################################################## + # Temporary workaround until we have dataset metadata to indicate + # temporal resolution. + if "monthly" in ds.lower(): + t_incr = 2592000 # 30 days + else: + t_incr = 86400 # 1 day + ################################################################## + t = self._endTime + self._latRes = None + self._lonRes = None + while ntiles == 0: + nexus_tiles = self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, self._minLon, self._maxLon, ds=ds, start_time=t-t_incr, end_time=t) + ntiles = len(nexus_tiles) + self.log.debug('find_global_tile_set got {0} tiles'.format(ntiles)) + if ntiles > 0: + for tile in nexus_tiles: + self.log.debug('tile coords:') + self.log.debug('tile lats: {0}'.format(tile.latitudes)) + self.log.debug('tile lons: {0}'.format(tile.longitudes)) + if self._latRes is None: + lats = tile.latitudes.data + if (len(lats) > 1): + self._latRes = abs(lats[1]-lats[0]) + if self._lonRes is None: + lons = tile.longitudes.data + if (len(lons) > 1): + self._lonRes = abs(lons[1]-lons[0]) + if ((self._latRes is not None) and + (self._lonRes is not None)): + break + if (self._latRes is None) or (self._lonRes is None): + ntiles = 0 + else: + lats_agg = np.concatenate([tile.latitudes.compressed() + for tile in nexus_tiles]) + lons_agg = np.concatenate([tile.longitudes.compressed() + for tile in nexus_tiles]) + self._minLatCent = np.min(lats_agg) + self._maxLatCent = np.max(lats_agg) + self._minLonCent = np.min(lons_agg) + self._maxLonCent = np.max(lons_agg) + t -= t_incr + return nexus_tiles + + def _find_tile_bounds(self, t): + lats = t.latitudes + lons = t.longitudes + if (len(lats.compressed()) > 0) and (len(lons.compressed()) > 0): + min_lat = np.ma.min(lats) + max_lat = np.ma.max(lats) + min_lon = np.ma.min(lons) + max_lon = np.ma.max(lons) + good_inds_lat = np.where(lats.mask == False)[0] + good_inds_lon = np.where(lons.mask == False)[0] + min_y = np.min(good_inds_lat) + max_y = np.max(good_inds_lat) + min_x = np.min(good_inds_lon) + max_x = np.max(good_inds_lon) + bounds = (min_lat, max_lat, min_lon, max_lon, + min_y, max_y, min_x, max_x) + else: + self.log.warn('Nothing in this tile!') + bounds = None + return bounds + + @staticmethod + def query_by_parts(tile_service, min_lat, max_lat, min_lon, max_lon, + dataset, start_time, end_time, part_dim=0): + nexus_max_tiles_per_query = 100 + #print 'trying query: ',min_lat, max_lat, min_lon, max_lon, \ + # dataset, start_time, end_time + try: + tiles = \ + tile_service.find_tiles_in_box(min_lat, max_lat, + min_lon, max_lon, + dataset, + start_time=start_time, + end_time=end_time, + fetch_data=False) + assert(len(tiles) <= nexus_max_tiles_per_query) + except: + #print 'failed query: ',min_lat, max_lat, min_lon, max_lon, \ + # dataset, start_time, end_time + if part_dim == 0: + # Partition by latitude. + mid_lat = (min_lat + max_lat) / 2 + nexus_tiles = SparkHandler.query_by_parts(tile_service, + min_lat, mid_lat, + min_lon, max_lon, + dataset, + start_time, end_time, + part_dim=part_dim) + nexus_tiles.extend(SparkHandler.query_by_parts(tile_service, + mid_lat, + max_lat, + min_lon, + max_lon, + dataset, + start_time, + end_time, + part_dim=part_dim)) + elif part_dim == 1: + # Partition by longitude. + mid_lon = (min_lon + max_lon) / 2 + nexus_tiles = SparkHandler.query_by_parts(tile_service, + min_lat, max_lat, + min_lon, mid_lon, + dataset, + start_time, end_time, + part_dim=part_dim) + nexus_tiles.extend(SparkHandler.query_by_parts(tile_service, + min_lat, + max_lat, + mid_lon, + max_lon, + dataset, + start_time, + end_time, + part_dim=part_dim)) + elif part_dim == 2: + # Partition by time. + mid_time = (start_time + end_time) / 2 + nexus_tiles = SparkHandler.query_by_parts(tile_service, + min_lat, max_lat, + min_lon, max_lon, + dataset, + start_time, mid_time, + part_dim=part_dim) + nexus_tiles.extend(SparkHandler.query_by_parts(tile_service, + min_lat, + max_lat, + min_lon, + max_lon, + dataset, + mid_time, + end_time, + part_dim=part_dim)) + else: + # No exception, so query Cassandra for the tile data. + #print 'Making NEXUS query to Cassandra for %d tiles...' % \ + # len(tiles) + #t1 = time.time() + #print 'NEXUS call start at time %f' % t1 + #sys.stdout.flush() + nexus_tiles = list(tile_service.fetch_data_for_tiles(*tiles)) + nexus_tiles = list(tile_service.mask_tiles_to_bbox(min_lat, max_lat, + min_lon, max_lon, + nexus_tiles)) + #t2 = time.time() + #print 'NEXUS call end at time %f' % t2 + #print 'Seconds in NEXUS call: ', t2-t1 + #sys.stdout.flush() + + #print 'Returning %d tiles' % len(nexus_tiles) + return nexus_tiles + + @staticmethod + def _prune_tiles(nexus_tiles): + del_ind = np.where([np.all(tile.data.mask) for tile in nexus_tiles])[0] + for i in np.flipud(del_ind): + del nexus_tiles[i] + + def _lat2ind(self,lat): + return int((lat-self._minLatCent)/self._latRes) + + def _lon2ind(self,lon): + return int((lon-self._minLonCent)/self._lonRes) + + def _ind2lat(self,y): + return self._minLatCent+y*self._latRes + + def _ind2lon(self,x): + return self._minLonCent+x*self._lonRes + + def _create_nc_file_time1d(self, a, fname, varname, varunits=None, + fill=None): + self.log.debug('a={0}'.format(a)) + self.log.debug('shape a = {0}'.format(a.shape)) + assert len(a.shape) == 1 + time_dim = len(a) + rootgrp = Dataset(fname, "w", format="NETCDF4") + rootgrp.createDimension("time", time_dim) + vals = rootgrp.createVariable(varname, "f4", dimensions=("time",), + fill_value=fill) + times = rootgrp.createVariable("time", "f4", dimensions=("time",)) + vals[:] = [d['mean'] for d in a] + times[:] = [d['time'] for d in a] + if varunits is not None: + vals.units = varunits + times.units = 'seconds since 1970-01-01 00:00:00' + rootgrp.close() + + def _create_nc_file_latlon2d(self, a, fname, varname, varunits=None, + fill=None): + self.log.debug('a={0}'.format(a)) + self.log.debug('shape a = {0}'.format(a.shape)) + assert len(a.shape) == 2 + lat_dim, lon_dim = a.shape + rootgrp = Dataset(fname, "w", format="NETCDF4") + rootgrp.createDimension("lat", lat_dim) + rootgrp.createDimension("lon", lon_dim) + vals = rootgrp.createVariable(varname, "f4", + dimensions=("lat","lon",), + fill_value=fill) + lats = rootgrp.createVariable("lat", "f4", dimensions=("lat",)) + lons = rootgrp.createVariable("lon", "f4", dimensions=("lon",)) + vals[:,:] = a + lats[:] = np.linspace(self._minLatCent, + self._maxLatCent, lat_dim) + lons[:] = np.linspace(self._minLonCent, + self._maxLonCent, lon_dim) + if varunits is not None: + vals.units = varunits + lats.units = "degrees north" + lons.units = "degrees east" + rootgrp.close() + + def _create_nc_file(self, a, fname, varname, **kwargs): + self._create_nc_file_latlon2d(a, fname, varname, **kwargs) + + +def executeInitializers(config): + [wrapper.init(config) for wrapper in AVAILABLE_INITIALIZERS] http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/__init__.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/__init__.py b/analysis/webservice/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/Capabilities.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py new file mode 100644 index 0000000..0477e67 --- /dev/null +++ b/analysis/webservice/algorithms/Capabilities.py @@ -0,0 +1,43 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import json + +from webservice.NexusHandler import CalcHandler, nexus_handler, AVAILABLE_HANDLERS +from webservice.webmodel import NexusResults + + +@nexus_handler +class CapabilitiesListHandlerImpl(CalcHandler): + name = "Capabilities" + path = "/capabilities" + description = "Lists the current capabilities of this Nexus system" + params = {} + singleton = True + + def __init__(self): + CalcHandler.__init__(self) + + def calc(self, computeOptions, **args): + capabilities = [] + + for capability in AVAILABLE_HANDLERS: + capabilityDef = { + "name": capability.name(), + "path": capability.path(), + "description": capability.description(), + "parameters": capability.params() + } + capabilities.append(capabilityDef) + + return CapabilitiesResults(capabilities) + + +class CapabilitiesResults(NexusResults): + def __init__(self, capabilities): + NexusResults.__init__(self) + self.__capabilities = capabilities + + def toJson(self): + return json.dumps(self.__capabilities, indent=4) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/CorrelationMap.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/CorrelationMap.py b/analysis/webservice/algorithms/CorrelationMap.py new file mode 100644 index 0000000..5064a7f --- /dev/null +++ b/analysis/webservice/algorithms/CorrelationMap.py @@ -0,0 +1,129 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import json +import math +import numpy as np +from shapely.geometry import box +from scipy.stats import linregress +from itertools import groupby +from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusProcessingException, NexusResults +from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon + + +@nexus_handler +class LongitudeLatitudeMapHandlerImpl(NexusHandler): + name = "Correlation Map" + path = "/correlationMap" + description = "Computes a correlation map between two datasets given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + params["res"] = { + "name": "Resolution", + "type": "float", + "description": "The resolution of the resulting correlation map" + } + singleton = True + + def __init__(self): + NexusHandler.__init__(self) + + def calc(self, computeOptions, **args): + minLat = computeOptions.get_min_lat() + maxLat = computeOptions.get_max_lat() + minLon = computeOptions.get_min_lon() + maxLon = computeOptions.get_max_lon() + ds = computeOptions.get_dataset() + startTime = computeOptions.get_start_time() + endTime = computeOptions.get_end_time() + resolution = computeOptions.get_decimal_arg("res", default=1.0) + + if not len(ds) == 2: + raise Exception("Requires two datasets for comparison. Specify request parameter ds=Dataset_1,Dataset_2") + + ds1tiles = self._tile_service.find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[0], startTime, + endTime) + ds2tiles = self._tile_service.find_tiles_in_polygon(box(minLon, minLat, maxLon, maxLat), ds[1], startTime, + endTime) + + matches = self._match_tiles(ds1tiles, ds2tiles) + + if len(matches) == 0: + raise NexusProcessingException(reason="Could not find any data temporally co-located") + + results = [[{ + 'cnt': 0, + 'slope': 0, + 'intercept': 0, + 'r': 0, + 'p': 0, + 'stderr': 0, + 'lat': float(lat), + 'lon': float(lon) + } for lon in np.arange(minLon, maxLon, resolution)] for lat in + np.arange(minLat, maxLat, resolution)] + + for stats in results: + for stat in stats: + values_x = [] + values_y = [] + for tile_matches in matches: + + tile_1_list = tile_matches[0] + value_1 = get_approximate_value_for_lat_lon(tile_1_list, stat["lat"], stat["lon"]) + + tile_2_list = tile_matches[1] + value_2 = get_approximate_value_for_lat_lon(tile_2_list, stat["lat"], stat["lon"]) + + if not (math.isnan(value_1) or math.isnan(value_2)): + values_x.append(value_1) + values_y.append(value_2) + + if len(values_x) > 2 and len(values_y) > 2: + stats = linregress(values_x, values_y) + + stat["slope"] = stats[0] if not math.isnan(stats[0]) and not math.isinf(stats[0]) else str(stats[0]) + stat["intercept"] = stats[1] if not math.isnan(stats[1]) and not math.isinf(stats[1]) else str( + stats[1]) + stat["r"] = stats[2] if not math.isnan(stats[2]) and not math.isinf(stats[2]) else str(stats[2]) + stat["p"] = stats[3] if not math.isnan(stats[3]) and not math.isinf(stats[3]) else str(stats[3]) + stat["stderr"] = stats[4] if not math.isnan(stats[4]) and not math.isinf(stats[4]) else str( + stats[4]) + stat["cnt"] = len(values_x) + + return CorrelationResults(results) + + def _match_tiles(self, tiles_1, tiles_2): + + date_map = {} + + tiles_1 = sorted(tiles_1, key=lambda tile: tile.min_time) + for tile_start_time, tiles in groupby(tiles_1, lambda tile: tile.min_time): + if not tile_start_time in date_map: + date_map[tile_start_time] = [] + date_map[tile_start_time].append(list(tiles)) + + tiles_2 = sorted(tiles_2, key=lambda tile: tile.min_time) + for tile_start_time, tiles in groupby(tiles_2, lambda tile: tile.min_time): + if not tile_start_time in date_map: + date_map[tile_start_time] = [] + date_map[tile_start_time].append(list(tiles)) + + matches = [tiles for a_time, tiles in date_map.iteritems() if len(tiles) == 2] + + return matches + + +class CorrelationResults(NexusResults): + def __init__(self, results): + NexusResults.__init__(self) + self.results = results + + def toJson(self): + json_d = { + "stats": {}, + "meta": [None, None], + "data": self.results + } + return json.dumps(json_d, indent=4) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DailyDifferenceAverage.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py new file mode 100644 index 0000000..1518c9e --- /dev/null +++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py @@ -0,0 +1,230 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import sys +import traceback +from datetime import datetime, timedelta +from multiprocessing.dummy import Pool, Manager +from shapely.geometry import box + +import numpy as np +import pytz +from nexustiles.nexustiles import NexusTileService, NexusTileServiceException + +from webservice.NexusHandler import NexusHandler, nexus_handler +from webservice.webmodel import NexusResults, NexusProcessingException + +SENTINEL = 'STOP' + + +@nexus_handler +class DailyDifferenceAverageImpl(NexusHandler): + name = "Daily Difference Average" + path = "/dailydifferenceaverage" + description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day." + params = { + "ds1": { + "name": "Dataset 1", + "type": "string", + "description": "The first Dataset shortname to use in calculation" + }, + "ds2": { + "name": "Dataset 2", + "type": "string", + "description": "The second Dataset shortname to use in calculation" + }, + "minLon": { + "name": "Minimum Longitude", + "type": "float", + "description": "Minimum (Western) bounding box Longitude" + }, + "minLat": { + "name": "Minimum Latitude", + "type": "float", + "description": "Minimum (Southern) bounding box Latitude" + }, + "maxLon": { + "name": "Maximum Longitude", + "type": "float", + "description": "Maximum (Eastern) bounding box Longitude" + }, + "maxLat": { + "name": "Maximum Latitude", + "type": "float", + "description": "Maximum (Northern) bounding box Latitude" + }, + "startTime": { + "name": "Start Time", + "type": "long integer", + "description": "Starting time in milliseconds since midnight Jan. 1st, 1970 UTC" + }, + "endTime": { + "name": "End Time", + "type": "long integer", + "description": "Ending time in milliseconds since midnight Jan. 1st, 1970 UTC" + } + } + singleton = True + + def __init__(self): + NexusHandler.__init__(self, skipCassandra=True) + + def calc(self, request, **args): + min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon() + dataset1 = request.get_argument("ds1", None) + dataset2 = request.get_argument("ds2", None) + start_time = request.get_start_time() + end_time = request.get_end_time() + + simple = request.get_argument("simple", None) is not None + + averagebyday = self.get_daily_difference_average_for_box(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, + start_time, end_time) + + averagebyday = sorted(averagebyday, key=lambda dayavg: dayavg[0]) + + if simple: + + import matplotlib.pyplot as plt + from matplotlib.dates import date2num + + times = [date2num(self.date_from_ms(dayavg[0])) for dayavg in averagebyday] + means = [dayavg[1] for dayavg in averagebyday] + plt.plot_date(times, means, ls='solid') + + plt.xlabel('Date') + plt.xticks(rotation=70) + plt.ylabel(u'Difference from 5-Day mean (\u00B0C)') + plt.title('Sea Surface Temperature (SST) Anomalies') + plt.grid(True) + plt.tight_layout() + plt.savefig("test.png") + + return averagebyday, None, None + else: + + result = NexusResults( + results=[[{'time': dayms, 'mean': avg, 'ds': 0}] for dayms, avg in averagebyday], + stats={}, + meta=self.get_meta()) + + result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time) + result.meta()['label'] = u'Difference from 5-Day mean (\u00B0C)' + + return result + + def date_from_ms(self, dayms): + base_datetime = datetime(1970, 1, 1) + delta = timedelta(0, 0, 0, dayms) + return base_datetime + delta + + def get_meta(self): + meta = { + "title": "Sea Surface Temperature (SST) Anomalies", + "description": "SST anomolies are departures from the 5-day pixel mean", + "units": u'\u00B0C', + } + return meta + + def get_daily_difference_average_for_box(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, + start_time, + end_time): + + daysinrange = self._tile_service.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset1, + start_time, end_time) + + maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses")) + + if maxprocesses == 1: + calculator = DailyDifferenceAverageCalculator() + averagebyday = [] + for dayinseconds in daysinrange: + result = calculator.calc_average_diff_on_day(min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, + dayinseconds) + averagebyday.append((result[0], result[1])) + else: + # Create a task to calc average difference for each day + manager = Manager() + work_queue = manager.Queue() + done_queue = manager.Queue() + for dayinseconds in daysinrange: + work_queue.put( + ('calc_average_diff_on_day', min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, dayinseconds)) + [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)] + + # Start new processes to handle the work + pool = Pool(maxprocesses) + [pool.apply_async(pool_worker, (work_queue, done_queue)) for _ in xrange(0, maxprocesses)] + pool.close() + + # Collect the results as [(day (in ms), average difference for that day)] + averagebyday = [] + for i in xrange(0, len(daysinrange)): + result = done_queue.get() + if result[0] == 'error': + print >> sys.stderr, result[1] + raise NexusProcessingException(reason="Error calculating average by day.") + rdata = result + averagebyday.append((rdata[0], rdata[1])) + + pool.terminate() + manager.shutdown() + + return averagebyday + + +class DailyDifferenceAverageCalculator(object): + def __init__(self): + self.__tile_service = NexusTileService() + + def calc_average_diff_on_day(self, min_lat, max_lat, min_lon, max_lon, dataset1, dataset2, timeinseconds): + + day_of_year = datetime.fromtimestamp(timeinseconds, pytz.utc).timetuple().tm_yday + + ds1_nexus_tiles = self.__tile_service.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, + dataset1, + timeinseconds) + + # Initialize list of differences + differences = [] + # For each ds1tile + for ds1_tile in ds1_nexus_tiles: + # Get tile for ds2 using bbox from ds1_tile and day ms + try: + ds2_tile = self.__tile_service.find_tile_by_polygon_and_most_recent_day_of_year( + box(ds1_tile.bbox.min_lon, ds1_tile.bbox.min_lat, ds1_tile.bbox.max_lon, ds1_tile.bbox.max_lat), + dataset2, day_of_year)[0] + # Subtract ds2 tile from ds1 tile + diff = np.subtract(ds1_tile.data, ds2_tile.data) + except NexusTileServiceException: + # This happens when there is data in ds1tile but all NaNs in ds2tile because the + # Solr query being used filters out results where stats_count = 0. + # Technically, this should never happen if ds2 is a climatology generated in part from ds1 + # and it is probably a data error + + # For now, just treat ds2 as an array of all masked data (which essentially discards the ds1 data) + ds2_tile = np.ma.masked_all(ds1_tile.data.shape) + diff = np.subtract(ds1_tile.data, ds2_tile) + + # Put results in list of differences + differences.append(np.ma.array(diff).ravel()) + + # Average List of differences + diffaverage = np.ma.mean(differences).item() + # Return Average by day + return int(timeinseconds), diffaverage + + +def pool_worker(work_queue, done_queue): + try: + calculator = DailyDifferenceAverageCalculator() + + for work in iter(work_queue.get, SENTINEL): + scifunction = work[0] + args = work[1:] + result = calculator.__getattribute__(scifunction)(*args) + done_queue.put(result) + except Exception as e: + e_str = traceback.format_exc(e) + done_queue.put(('error', e_str)) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DataInBoundsSearch.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py b/analysis/webservice/algorithms/DataInBoundsSearch.py new file mode 100644 index 0000000..f9aa609 --- /dev/null +++ b/analysis/webservice/algorithms/DataInBoundsSearch.py @@ -0,0 +1,205 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import logging +from datetime import datetime +from pytz import timezone + +from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusResults, NexusProcessingException + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +@nexus_handler +class DataInBoundsSearchHandlerImpl(NexusHandler): + name = "Data In-Bounds Search" + path = "/datainbounds" + description = "Fetches point values for a given dataset and geographical area" + params = { + "ds": { + "name": "Dataset", + "type": "string", + "description": "The Dataset shortname to use in calculation. Required" + }, + "parameter": { + "name": "Parameter", + "type": "string", + "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required" + }, + "b": { + "name": "Bounding box", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + } + } + singleton = True + + def __init__(self): + NexusHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + + try: + ds = request.get_dataset()[0] + except: + raise NexusProcessingException(reason="'ds' argument is required", code=400) + + parameter_s = request.get_argument('parameter', None) + if parameter_s not in ['sst', 'sss', 'wind', None]: + raise NexusProcessingException( + reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) + + try: + start_time = request.get_start_datetime() + start_time = long((start_time - EPOCH).total_seconds()) + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + end_time = long((end_time - EPOCH).total_seconds()) + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + try: + bounding_polygon = request.get_bounding_polygon() + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + + return ds, parameter_s, start_time, end_time, bounding_polygon + + def calc(self, computeOptions, **args): + ds, parameter, start_time, end_time, bounding_polygon = self.parse_arguments(computeOptions) + + includemeta = computeOptions.get_include_meta() + + min_lat = bounding_polygon.bounds[1] + max_lat = bounding_polygon.bounds[3] + min_lon = bounding_polygon.bounds[0] + max_lon = bounding_polygon.bounds[2] + + tiles = self._tile_service.get_tiles_bounded_by_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, + end_time) + + data = [] + for tile in tiles: + for nexus_point in tile.nexus_point_generator(): + + point = dict() + point['id'] = tile.tile_id + + if parameter == 'sst': + point['sst'] = nexus_point.data_val + elif parameter == 'sss': + point['sss'] = nexus_point.data_val + elif parameter == 'wind': + point['wind_u'] = nexus_point.data_val + try: + point['wind_v'] = tile.meta_data['wind_v'][tuple(nexus_point.index)] + except (KeyError, IndexError): + pass + try: + point['wind_direction'] = tile.meta_data['wind_dir'][tuple(nexus_point.index)] + except (KeyError, IndexError): + pass + try: + point['wind_speed'] = tile.meta_data['wind_speed'][tuple(nexus_point.index)] + except (KeyError, IndexError): + pass + else: + pass + + data.append({ + 'latitude': nexus_point.latitude, + 'longitude': nexus_point.longitude, + 'time': nexus_point.time, + 'data': [ + point + ] + }) + + if includemeta and len(tiles) > 0: + meta = [tile.get_summary() for tile in tiles] + else: + meta = None + + result = DataInBoundsResult( + results=data, + stats={}, + meta=meta) + + result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time) + + return result + + +class DataInBoundsResult(NexusResults): + def toCSV(self): + rows = [] + + headers = [ + "id", + "lon", + "lat", + "time" + ] + + for i, result in enumerate(self.results()): + cols = [] + + cols.append(str(result['data'][0]['id'])) + cols.append(str(result['longitude'])) + cols.append(str(result['latitude'])) + cols.append(datetime.utcfromtimestamp(result["time"]).strftime('%Y-%m-%dT%H:%M:%SZ')) + if 'sst' in result['data'][0]: + cols.append(str(result['data'][0]['sst'])) + if i == 0: + headers.append("sea_water_temperature") + elif 'sss' in result['data'][0]: + cols.append(str(result['data'][0]['sss'])) + if i == 0: + headers.append("sea_water_salinity") + elif 'wind_u' in result['data'][0]: + cols.append(str(result['data'][0]['wind_u'])) + cols.append(str(result['data'][0]['wind_v'])) + cols.append(str(result['data'][0]['wind_direction'])) + cols.append(str(result['data'][0]['wind_speed'])) + if i == 0: + headers.append("eastward_wind") + headers.append("northward_wind") + headers.append("wind_direction") + headers.append("wind_speed") + + if i == 0: + rows.append(",".join(headers)) + rows.append(",".join(cols)) + + return "\r\n".join(rows) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DataSeriesList.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py new file mode 100644 index 0000000..399742a --- /dev/null +++ b/analysis/webservice/algorithms/DataSeriesList.py @@ -0,0 +1,30 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import json +from webservice.NexusHandler import NexusHandler +from webservice.NexusHandler import nexus_handler +from webservice.webmodel import cached + + +@nexus_handler +class DataSeriesListHandlerImpl(NexusHandler): + name = "Dataset List" + path = "/list" + description = "Lists datasets currently available for analysis" + params = {} + + def __init__(self): + NexusHandler.__init__(self, skipCassandra=True) + + @cached(ttl=(60 * 60 * 1000)) # 1 hour cached + def calc(self, computeOptions, **args): + class SimpleResult(object): + def __init__(self, result): + self.result = result + + def toJson(self): + return json.dumps(self.result) + + return SimpleResult(self._tile_service.get_dataseries_list()) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/DelayTest.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/DelayTest.py b/analysis/webservice/algorithms/DelayTest.py new file mode 100644 index 0000000..85bdb8c --- /dev/null +++ b/analysis/webservice/algorithms/DelayTest.py @@ -0,0 +1,29 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import time + +from webservice.NexusHandler import CalcHandler +from webservice.NexusHandler import nexus_handler + + +@nexus_handler +class DelayHandlerImpl(CalcHandler): + name = "Delay" + path = "/delay" + description = "Waits a little while" + params = {} + singleton = True + + def __init__(self): + CalcHandler.__init__(self) + + def calc(self, computeOptions, **args): + time.sleep(10) + + class SimpleResult(object): + def toJson(self): + return "" + + return SimpleResult() http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/ErrorTosserTest.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/ErrorTosserTest.py b/analysis/webservice/algorithms/ErrorTosserTest.py new file mode 100644 index 0000000..3f791d5 --- /dev/null +++ b/analysis/webservice/algorithms/ErrorTosserTest.py @@ -0,0 +1,23 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +from webservice.NexusHandler import CalcHandler, nexus_handler + + +@nexus_handler +class ErrorTosserHandler(CalcHandler): + name = "MakeError" + path = "/makeerror" + description = "Causes an error" + params = {} + singleton = True + + def __init__(self): + CalcHandler.__init__(self) + + def calc(self, computeOptions, **args): + a = 100 / 0.0 + # raise Exception("I'm Mad!") + # raise NexusProcessingException.NexusProcessingException(NexusProcessingException.StandardNexusErrors.UNKNOWN, "I'm Mad!") + return {}, None, None http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/Heartbeat.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/Heartbeat.py b/analysis/webservice/algorithms/Heartbeat.py new file mode 100644 index 0000000..a462739 --- /dev/null +++ b/analysis/webservice/algorithms/Heartbeat.py @@ -0,0 +1,40 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import json + +from webservice.NexusHandler import NexusHandler, nexus_handler, AVAILABLE_HANDLERS +from webservice.webmodel import NexusResults + + +@nexus_handler +class HeartbeatHandlerImpl(NexusHandler): + name = "Backend Services Status" + path = "/heartbeat" + description = "Returns health status of Nexus backend services" + params = {} + singleton = True + + def __init__(self): + NexusHandler.__init__(self, skipCassandra=True) + + def calc(self, computeOptions, **args): + solrOnline = self._tile_service.pingSolr() + + # Not sure how to best check cassandra cluster status so just return True for now + cassOnline = True + + if solrOnline and cassOnline: + status = {"online": True} + else: + status = {"online": False} + + class SimpleResult(object): + def __init__(self, result): + self.result = result + + def toJson(self): + return json.dumps(self.result) + + return SimpleResult(status) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/HofMoeller.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/HofMoeller.py b/analysis/webservice/algorithms/HofMoeller.py new file mode 100644 index 0000000..98cff8d --- /dev/null +++ b/analysis/webservice/algorithms/HofMoeller.py @@ -0,0 +1,362 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import itertools +import logging +import traceback +from cStringIO import StringIO +from datetime import datetime +from multiprocessing.dummy import Pool, Manager + +import matplotlib.pyplot as plt +import mpld3 +import numpy as np +from matplotlib import cm +from matplotlib.ticker import FuncFormatter + +from webservice.NexusHandler import NexusHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusProcessingException, NexusResults + +SENTINEL = 'STOP' +LATITUDE = 0 +LONGITUDE = 1 + + +class LongitudeHofMoellerCalculator(object): + def longitude_time_hofmoeller_stats(self, tile, index): + stat = { + 'sequence': index, + 'time': np.ma.min(tile.times), + 'lons': [] + } + + points = list(tile.nexus_point_generator()) + data = sorted(points, key=lambda p: p.longitude) + points_by_lon = itertools.groupby(data, key=lambda p: p.longitude) + + for lon, points_at_lon in points_by_lon: + values_at_lon = np.array([point.data_val for point in points_at_lon]) + stat['lons'].append({ + 'longitude': float(lon), + 'cnt': len(values_at_lon), + 'avg': np.mean(values_at_lon).item(), + 'max': np.max(values_at_lon).item(), + 'min': np.min(values_at_lon).item(), + 'std': np.std(values_at_lon).item() + }) + + return stat + + +class LatitudeHofMoellerCalculator(object): + def latitude_time_hofmoeller_stats(self, tile, index): + stat = { + 'sequence': index, + 'time': np.ma.min(tile.times), + 'lats': [] + } + + points = list(tile.nexus_point_generator()) + data = sorted(points, key=lambda p: p.latitude) + points_by_lat = itertools.groupby(data, key=lambda p: p.latitude) + + for lat, points_at_lat in points_by_lat: + values_at_lat = np.array([point.data_val for point in points_at_lat]) + + stat['lats'].append({ + 'latitude': float(lat), + 'cnt': len(values_at_lat), + 'avg': np.mean(values_at_lat).item(), + 'max': np.max(values_at_lat).item(), + 'min': np.min(values_at_lat).item(), + 'std': np.std(values_at_lat).item() + }) + + return stat + + +class BaseHoffMoellerHandlerImpl(NexusHandler): + def __init__(self): + NexusHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True): + shape = (len(results), len(results[0][pivot])) + if shape[0] <= 12: + return results + for a in range(0, 12): + values = [] + for b in range(a, len(results), 12): + values.append(np.average([l[field] for l in results[b][pivot]])) + avg = np.average(values) + + for b in range(a, len(results), 12): + for l in results[b][pivot]: + l["%sSeasonal" % field] = l[field] - avg + + return results + + def applyDeseasonToHofMoeller(self, results, pivot="lats", append=True): + results = self.applyDeseasonToHofMoellerByField(results, pivot, field="avg", append=append) + results = self.applyDeseasonToHofMoellerByField(results, pivot, field="min", append=append) + results = self.applyDeseasonToHofMoellerByField(results, pivot, field="max", append=append) + return results + + +@nexus_handler +class LatitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerHandlerImpl): + name = "Latitude/Time HofMoeller" + path = "/latitudeTimeHofMoeller" + description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + BaseHoffMoellerHandlerImpl.__init__(self) + + def calc(self, computeOptions, **args): + tiles = self._tile_service.get_tiles_bounded_by_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), + computeOptions.get_min_lon(), computeOptions.get_max_lon(), + computeOptions.get_dataset()[0], + computeOptions.get_start_time(), + computeOptions.get_end_time()) + + if len(tiles) == 0: + raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe") + + maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses")) + + results = [] + if maxprocesses == 1: + calculator = LatitudeHofMoellerCalculator() + for x, tile in enumerate(tiles): + result = calculator.latitude_time_hofmoeller_stats(tile, x) + results.append(result) + else: + manager = Manager() + work_queue = manager.Queue() + done_queue = manager.Queue() + for x, tile in enumerate(tiles): + work_queue.put( + ('latitude_time_hofmoeller_stats', tile, x)) + [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)] + + # Start new processes to handle the work + pool = Pool(maxprocesses) + [pool.apply_async(pool_worker, (LATITUDE, work_queue, done_queue)) for _ in xrange(0, maxprocesses)] + pool.close() + + # Collect the results + for x, tile in enumerate(tiles): + result = done_queue.get() + try: + error_str = result['error'] + self.log.error(error_str) + raise NexusProcessingException(reason="Error calculating latitude_time_hofmoeller_stats.") + except KeyError: + pass + + results.append(result) + + pool.terminate() + manager.shutdown() + + results = sorted(results, key=lambda entry: entry["time"]) + + results = self.applyDeseasonToHofMoeller(results) + + result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE) + return result + + +@nexus_handler +class LongitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerHandlerImpl): + name = "Longitude/Time HofMoeller" + path = "/longitudeTimeHofMoeller" + description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + BaseHoffMoellerHandlerImpl.__init__(self) + + def calc(self, computeOptions, **args): + tiles = self._tile_service.get_tiles_bounded_by_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), + computeOptions.get_min_lon(), computeOptions.get_max_lon(), + computeOptions.get_dataset()[0], + computeOptions.get_start_time(), + computeOptions.get_end_time()) + + if len(tiles) == 0: + raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe") + + maxprocesses = int(self.algorithm_config.get("multiprocessing", "maxprocesses")) + + results = [] + if maxprocesses == 1: + calculator = LongitudeHofMoellerCalculator() + for x, tile in enumerate(tiles): + result = calculator.longitude_time_hofmoeller_stats(tile, x) + results.append(result) + else: + manager = Manager() + work_queue = manager.Queue() + done_queue = manager.Queue() + for x, tile in enumerate(tiles): + work_queue.put( + ('longitude_time_hofmoeller_stats', tile, x)) + [work_queue.put(SENTINEL) for _ in xrange(0, maxprocesses)] + + # Start new processes to handle the work + pool = Pool(maxprocesses) + [pool.apply_async(pool_worker, (LONGITUDE, work_queue, done_queue)) for _ in xrange(0, maxprocesses)] + pool.close() + + # Collect the results + for x, tile in enumerate(tiles): + result = done_queue.get() + try: + error_str = result['error'] + self.log.error(error_str) + raise NexusProcessingException(reason="Error calculating longitude_time_hofmoeller_stats.") + except KeyError: + pass + + results.append(result) + + pool.terminate() + manager.shutdown() + + results = sorted(results, key=lambda entry: entry["time"]) + + results = self.applyDeseasonToHofMoeller(results, pivot="lons") + + result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE) + return result + + +class HoffMoellerResults(NexusResults): + LATITUDE = 0 + LONGITUDE = 1 + + def __init__(self, results=None, meta=None, stats=None, computeOptions=None, **args): + NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions) + self.__type = args['type'] + + def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'): + cmap = cm.coolwarm + # ls = LightSource(315, 45) + # rgb = ls.shade(data, cmap) + + fig, ax = plt.subplots() + fig.set_size_inches(11.0, 8.5) + cax = ax.imshow(data, interpolation=interpolate, cmap=cmap) + + def yFormatter(y, pos): + if y < len(coordSeries): + return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.) + else: + return "" + + def xFormatter(x, pos): + if x < len(timeSeries): + return timeSeries[int(x)].strftime('%b %Y') + else: + return "" + + ax.xaxis.set_major_formatter(FuncFormatter(xFormatter)) + ax.yaxis.set_major_formatter(FuncFormatter(yFormatter)) + + ax.set_title(title) + ax.set_ylabel(coordName) + ax.set_xlabel('Date') + + fig.colorbar(cax) + fig.autofmt_xdate() + + labels = ['point {0}'.format(i + 1) for i in range(len(data))] + # plugins.connect(fig, plugins.MousePosition(fontsize=14)) + tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels) + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + def createLongitudeHoffmueller(self, res, meta): + lonSeries = [m['longitude'] for m in res[0]['lons']] + timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res] + + data = np.zeros((len(lonSeries), len(timeSeries))) + + plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "avg" + + for t in range(0, len(timeSeries)): + timeSet = res[t] + for l in range(0, len(lonSeries)): + latSet = timeSet['lons'][l] + value = latSet[plotSeries] + data[len(lonSeries) - l - 1][t] = value + + title = meta['title'] + source = meta['source'] + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + return self.createHoffmueller(data, lonSeries, timeSeries, "Longitude", + "%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest') + + def createLatitudeHoffmueller(self, res, meta): + latSeries = [m['latitude'] for m in res[0]['lats']] + timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res] + + data = np.zeros((len(latSeries), len(timeSeries))) + + plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "avg" + + for t in range(0, len(timeSeries)): + timeSet = res[t] + for l in range(0, len(latSeries)): + latSet = timeSet['lats'][l] + value = latSet[plotSeries] + data[len(latSeries) - l - 1][t] = value + + title = meta['title'] + source = meta['source'] + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + return self.createHoffmueller(data, latSeries, timeSeries, "Latitude", + title="%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest') + + def toImage(self): + res = self.results() + meta = self.meta() + + if self.__type == HoffMoellerResults.LATITUDE: + return self.createLatitudeHoffmueller(res, meta) + elif self.__type == HoffMoellerResults.LONGITUDE: + return self.createLongitudeHoffmueller(res, meta) + else: + raise Exception("Unsupported HoffMoeller Plot Type") + + +def pool_worker(type, work_queue, done_queue): + try: + + if type == LATITUDE: + calculator = LatitudeHofMoellerCalculator() + elif type == LONGITUDE: + calculator = LongitudeHofMoellerCalculator() + + for work in iter(work_queue.get, SENTINEL): + scifunction = work[0] + args = work[1:] + result = calculator.__getattribute__(scifunction)(*args) + done_queue.put(result) + + except Exception as e: + e_str = traceback.format_exc(e) + done_queue.put({'error': e_str}) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/LongitudeLatitudeMap.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/LongitudeLatitudeMap.py b/analysis/webservice/algorithms/LongitudeLatitudeMap.py new file mode 100644 index 0000000..be9123e --- /dev/null +++ b/analysis/webservice/algorithms/LongitudeLatitudeMap.py @@ -0,0 +1,249 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import logging +import math +from datetime import datetime + +from pytz import timezone +from shapely.geometry import box + +from webservice.NexusHandler import NexusHandler, nexus_handler +from webservice.webmodel import NexusResults, NexusProcessingException + +SENTINEL = 'STOP' +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +tile_service = None + + +@nexus_handler +class LongitudeLatitudeMapHandlerImpl(NexusHandler): + name = "Longitude/Latitude Time Average Map" + path = "/longitudeLatitudeMap" + description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range" + params = { + "ds": { + "name": "Dataset", + "type": "string", + "description": "One or more comma-separated dataset shortnames" + }, + "minLat": { + "name": "Minimum Latitude", + "type": "float", + "description": "Minimum (Southern) bounding box Latitude" + }, + "maxLat": { + "name": "Maximum Latitude", + "type": "float", + "description": "Maximum (Northern) bounding box Latitude" + }, + "minLon": { + "name": "Minimum Longitude", + "type": "float", + "description": "Minimum (Western) bounding box Longitude" + }, + "maxLon": { + "name": "Maximum Longitude", + "type": "float", + "description": "Maximum (Eastern) bounding box Longitude" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970)" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since epoch (Jan 1st, 1970)" + } + } + singleton = True + + def __init__(self): + NexusHandler.__init__(self, skipCassandra=True) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + try: + ds = request.get_dataset()[0] + except: + raise NexusProcessingException(reason="'ds' argument is required", code=400) + + try: + bounding_polygon = box(request.get_min_lon(), request.get_min_lat(), request.get_max_lon(), + request.get_max_lat()) + except: + raise NexusProcessingException( + reason="'minLon', 'minLat', 'maxLon', and 'maxLat' arguments are required.", + code=400) + + try: + start_time = request.get_start_datetime() + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value milliseconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value milliseconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + start_seconds_from_epoch = long((start_time - EPOCH).total_seconds()) + end_seconds_from_epoch = long((end_time - EPOCH).total_seconds()) + + return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch + + def calc(self, request, **args): + + ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch = self.parse_arguments(request) + + boxes = self._tile_service.get_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, + start_seconds_from_epoch, + end_seconds_from_epoch) + point_avg_over_time = lat_lon_map_driver(bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, ds, + [a_box.bounds for a_box in boxes]) + + kwargs = { + "minLon": bounding_polygon.bounds[0], + "minLat": bounding_polygon.bounds[1], + "maxLon": bounding_polygon.bounds[2], + "maxLat": bounding_polygon.bounds[3], + "ds": ds, + "startTime": datetime.utcfromtimestamp(start_seconds_from_epoch).strftime('%Y-%m-%dT%H:%M:%SZ'), + "endTime": datetime.utcfromtimestamp(end_seconds_from_epoch).strftime('%Y-%m-%dT%H:%M:%SZ') + } + return LongitudeLatitudeMapResults( + results=LongitudeLatitudeMapHandlerImpl.results_to_dicts(point_avg_over_time), meta=None, + **kwargs) + + @staticmethod + def results_to_dicts(results): + + # ((lon, lat), (slope, intercept, r_value, p_value, std_err, mean, pmax, pmin, pstd, pcnt)) + return [{ + 'lon': result[0][0], + 'lat': result[0][1], + 'slope': result[1][0] if not math.isnan(result[1][0]) else 'NaN', + 'intercept': result[1][1] if not math.isnan(result[1][1]) else 'NaN', + 'r': result[1][2], + 'p': result[1][3], + 'stderr': result[1][4] if not math.isinf(result[1][4]) else 'Inf', + 'avg': result[1][5], + 'max': result[1][6], + 'min': result[1][7], + 'std': result[1][8], + 'cnt': result[1][9], + } for result in results] + + +def pool_initializer(): + from nexustiles.nexustiles import NexusTileService + global tile_service + tile_service = NexusTileService() + # TODO This is a hack to make sure each sub-process uses it's own connection to cassandra. data-access needs to be updated + from cassandra.cqlengine import connection + from multiprocessing import current_process + + connection.register_connection(current_process().name, [host.address for host in connection.get_session().hosts]) + connection.set_default_connection(current_process().name) + + +def lat_lon_map_driver(search_bounding_polygon, search_start, search_end, ds, distinct_boxes): + from functools import partial + from nexustiles.nexustiles import NexusTileService + # Start new processes to handle the work + # pool = Pool(5, pool_initializer) + + func = partial(regression_on_tiles, search_bounding_polygon_wkt=search_bounding_polygon.wkt, + search_start=search_start, search_end=search_end, ds=ds) + + global tile_service + tile_service = NexusTileService() + map_result = map(func, distinct_boxes) + return [item for sublist in map_result for item in sublist] + # TODO Use for multiprocessing: + # result = pool.map_async(func, distinct_boxes) + # + # pool.close() + # while not result.ready(): + # print "Not ready" + # time.sleep(5) + # + # print result.successful() + # print result.get()[0] + # pool.join() + # pool.terminate() + # + # return [item for sublist in result.get() for item in sublist] + + +def calc_linear_regression(arry, xarry): + from scipy.stats import linregress + slope, intercept, r_value, p_value, std_err = linregress(arry, xarry) + return slope, intercept, r_value, p_value, std_err + + +def regression_on_tiles(tile_bounds, search_bounding_polygon_wkt, search_start, search_end, ds): + if len(tile_bounds) < 1: + return [] + + import numpy as np + import operator + from shapely import wkt + from shapely.geometry import box + search_bounding_shape = wkt.loads(search_bounding_polygon_wkt) + tile_bounding_shape = box(*tile_bounds) + + # Load all tiles for given (exact) bounding box across the search time range + tiles = tile_service.find_tiles_by_exact_bounds(tile_bounds, ds, search_start, search_end) + if search_bounding_shape.contains(tile_bounding_shape): + # The tile bounds are totally contained in the search area, we don't need to mask it. + pass + else: + # The tile bounds cross the search area borders, we need to mask the tiles to the search area + tiles = tile_service.mask_tiles_to_polygon(wkt.loads(search_bounding_polygon_wkt), tiles) + # If all tiles end up being masked, there is no work to do + if len(tiles) < 1: + return [] + tiles.sort(key=operator.attrgetter('min_time')) + + stacked_tile_data = np.stack(tuple([np.squeeze(tile.data, 0) for tile in tiles])) + stacked_tile_lons = np.stack([tile.longitudes for tile in tiles]) + stacked_tile_lats = np.stack([tile.latitudes for tile in tiles]) + + x_array = np.arange(stacked_tile_data.shape[0]) + point_regressions = np.apply_along_axis(calc_linear_regression, 0, stacked_tile_data, x_array) + point_means = np.nanmean(stacked_tile_data, 0) + point_maximums = np.nanmax(stacked_tile_data, 0) + point_minimums = np.nanmin(stacked_tile_data, 0) + point_st_deviations = np.nanstd(stacked_tile_data, 0) + point_counts = np.ma.count(np.ma.masked_invalid(stacked_tile_data), 0) + + results = [] + for lat_idx, lon_idx in np.ndindex(point_means.shape): + lon, lat = np.max(stacked_tile_lons[:, lon_idx]), np.max(stacked_tile_lats[:, lat_idx]) + pcnt = point_counts[lat_idx, lon_idx] + + if pcnt == 0: + continue + + mean = point_means[lat_idx, lon_idx] + pmax = point_maximums[lat_idx, lon_idx] + pmin = point_minimums[lat_idx, lon_idx] + pstd = point_st_deviations[lat_idx, lon_idx] + slope, intercept, r_value, p_value, std_err = point_regressions[:, lat_idx, lon_idx] + + results.append(((lon, lat), (slope, intercept, r_value, p_value, std_err, mean, pmax, pmin, pstd, pcnt))) + + return results + + +class LongitudeLatitudeMapResults(NexusResults): + def __init__(self, results=None, meta=None, computeOptions=None, **kwargs): + NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions, **kwargs)
