http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/TimeSeriesSpark.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py new file mode 100644 index 0000000..6b2c6a1 --- /dev/null +++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py @@ -0,0 +1,554 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import calendar +import itertools +import logging +import traceback +from cStringIO import StringIO +from datetime import datetime + +import matplotlib.dates as mdates +import matplotlib.pyplot as plt +import numpy as np +import pytz +import shapely.geometry +import shapely.wkt +from backports.functools_lru_cache import lru_cache +from nexustiles.nexustiles import NexusTileService +from pytz import timezone +from scipy import stats + +from webservice import Filtering as filtering +from webservice.NexusHandler import nexus_handler, SparkHandler +from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +@nexus_handler +class TimeSeriesHandlerImpl(SparkHandler): + name = "Time Series Spark" + path = "/timeSeriesSpark" + description = "Computes a time series plot between one or more datasets given an arbitrary geographical area and time range" + params = { + "ds": { + "name": "Dataset", + "type": "comma-delimited string", + "description": "The dataset(s) Used to generate the Time Series. Required" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "b": { + "name": "Bounding box", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" + }, + "seasonalFilter": { + "name": "Compute Seasonal Cycle Filter", + "type": "boolean", + "description": "Flag used to specify if the seasonal averages should be computed during " + "Time Series computation. Optional (Default: False)" + }, + "lowPassFilter": { + "name": "Compute Low Pass Filter", + "type": "boolean", + "description": "Flag used to specify if a low pass filter should be computed during " + "Time Series computation. Optional (Default: True)" + }, + "spark": { + "name": "Spark Configuration", + "type": "comma-delimited value", + "description": "Configuration used to launch in the Spark cluster. Value should be 3 elements separated by " + "commas. 1) Spark Master 2) Number of Spark Executors 3) Number of Spark Partitions. Only " + "Number of Spark Partitions is used by this function. Optional (Default: local,1,1)" + } + } + singleton = True + + def __init__(self): + SparkHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + + try: + ds = request.get_dataset() + if type(ds) != list and type(ds) != tuple: + ds = (ds,) + except: + raise NexusProcessingException( + reason="'ds' argument is required. Must be comma-delimited string", + code=400) + + # Do not allow time series on Climatology + if next(iter([clim for clim in ds if 'CLIM' in clim]), False): + raise NexusProcessingException(reason="Cannot compute time series on a climatology", code=400) + + try: + bounding_polygon = request.get_bounding_polygon() + request.get_min_lon = lambda: bounding_polygon.bounds[0] + request.get_min_lat = lambda: bounding_polygon.bounds[1] + request.get_max_lon = lambda: bounding_polygon.bounds[2] + request.get_max_lat = lambda: bounding_polygon.bounds[3] + except: + try: + west, south, east, north = request.get_min_lon(), request.get_min_lat(), \ + request.get_max_lon(), request.get_max_lat() + bounding_polygon = shapely.geometry.Polygon( + [(west, south), (east, south), (east, north), (west, north), (west, south)]) + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as " + "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + + try: + start_time = request.get_start_datetime() + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or " + "string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or " + "string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + apply_seasonal_cycle_filter = request.get_apply_seasonal_cycle_filter(default=False) + apply_low_pass_filter = request.get_apply_low_pass_filter() + + spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg() + + start_seconds_from_epoch = long((start_time - EPOCH).total_seconds()) + end_seconds_from_epoch = long((end_time - EPOCH).total_seconds()) + + return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \ + apply_seasonal_cycle_filter, apply_low_pass_filter, spark_master, spark_nexecs, spark_nparts + + def calc(self, request, **args): + """ + + :param request: StatsComputeOptions + :param args: dict + :return: + """ + + ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, \ + apply_seasonal_cycle_filter, apply_low_pass_filter, spark_master, \ + spark_nexecs, spark_nparts = self.parse_arguments(request) + + resultsRaw = [] + + for shortName in ds: + + the_time = datetime.now() + daysinrange = self._tile_service.find_days_in_range_asc(bounding_polygon.bounds[1], + bounding_polygon.bounds[3], + bounding_polygon.bounds[0], + bounding_polygon.bounds[2], + shortName, + start_seconds_from_epoch, + end_seconds_from_epoch) + self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), shortName)) + + ndays = len(daysinrange) + if ndays == 0: + raise NoDataException(reason="No data found for selected timeframe") + + self.log.debug('Found {0} days in range'.format(ndays)) + for i, d in enumerate(daysinrange): + self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d))) + spark_nparts_needed = min(spark_nparts, ndays) + + the_time = datetime.now() + results, meta = spark_driver(daysinrange, bounding_polygon, shortName, + spark_nparts_needed=spark_nparts_needed, sc=self._sc) + self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName)) + + if apply_seasonal_cycle_filter: + the_time = datetime.now() + for result in results: + month = datetime.utcfromtimestamp(result['time']).month + month_mean, month_max, month_min = self.calculate_monthly_average(month, bounding_polygon.wkt, + shortName) + seasonal_mean = result['mean'] - month_mean + seasonal_min = result['min'] - month_min + seasonal_max = result['max'] - month_max + result['meanSeasonal'] = seasonal_mean + result['minSeasonal'] = seasonal_min + result['maxSeasonal'] = seasonal_max + self.log.info( + "Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName)) + + the_time = datetime.now() + filtering.applyAllFiltersOnField(results, 'mean', applySeasonal=False, applyLowPass=apply_low_pass_filter) + filtering.applyAllFiltersOnField(results, 'max', applySeasonal=False, applyLowPass=apply_low_pass_filter) + filtering.applyAllFiltersOnField(results, 'min', applySeasonal=False, applyLowPass=apply_low_pass_filter) + + if apply_seasonal_cycle_filter and apply_low_pass_filter: + try: + filtering.applyFiltersOnField(results, 'meanSeasonal', applySeasonal=False, applyLowPass=True, + append="LowPass") + filtering.applyFiltersOnField(results, 'minSeasonal', applySeasonal=False, applyLowPass=True, + append="LowPass") + filtering.applyFiltersOnField(results, 'maxSeasonal', applySeasonal=False, applyLowPass=True, + append="LowPass") + except Exception as e: + # If it doesn't work log the error but ignore it + tb = traceback.format_exc() + self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) + + resultsRaw.append([results, meta]) + self.log.info( + "LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), shortName)) + + the_time = datetime.now() + self._create_nc_file_time1d(np.array(results), 'ts.nc', 'mean', + fill=-9999.) + self.log.info( + "NetCDF generation took %s for dataset %s" % (str(datetime.now() - the_time), shortName)) + + the_time = datetime.now() + results = self._mergeResults(resultsRaw) + + if len(ds) == 2: + try: + stats = TimeSeriesHandlerImpl.calculate_comparison_stats(results) + except Exception: + stats = {} + tb = traceback.format_exc() + self.log.warn("Error when calculating comparison stats:\n%s" % tb) + else: + stats = {} + + meta = [] + for singleRes in resultsRaw: + meta.append(singleRes[1]) + + res = TimeSeriesResults(results=results, meta=meta, stats=stats, + computeOptions=None, minLat=bounding_polygon.bounds[1], + maxLat=bounding_polygon.bounds[3], minLon=bounding_polygon.bounds[0], + maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch, + endTime=end_seconds_from_epoch) + + self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time))) + return res + + @lru_cache() + def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, ds=None): + + min_date, max_date = self.get_min_max_date(ds=ds) + + monthly_averages, monthly_counts = [], [] + monthly_mins, monthly_maxes = [], [] + bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt) + for year in range(min_date.year, max_date.year + 1): + beginning_of_month = datetime(year, month, 1) + end_of_month = datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59) + start = (pytz.UTC.localize(beginning_of_month) - EPOCH).total_seconds() + end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds() + tile_stats = self._tile_service.find_tiles_in_polygon(bounding_polygon, ds, start, end, + fl=('id,' + 'tile_avg_val_d,tile_count_i,' + 'tile_min_val_d,tile_max_val_d,' + 'tile_min_lat,tile_max_lat,' + 'tile_min_lon,tile_max_lon'), + fetch_data=False) + if len(tile_stats) == 0: + continue + + # Split list into tiles on the border of the bounding box and tiles completely inside the bounding box. + border_tiles, inner_tiles = [], [] + for tile in tile_stats: + inner_tiles.append(tile) if bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon, + tile.bbox.min_lat, + tile.bbox.max_lon, + tile.bbox.max_lat)) else border_tiles.append( + tile) + + # We can use the stats of the inner tiles directly + tile_means = [tile.tile_stats.mean for tile in inner_tiles] + tile_mins = [tile.tile_stats.min for tile in inner_tiles] + tile_maxes = [tile.tile_stats.max for tile in inner_tiles] + tile_counts = [tile.tile_stats.count for tile in inner_tiles] + + # Border tiles need have the data loaded, masked, and stats recalculated + border_tiles = list(self._tile_service.fetch_data_for_tiles(*border_tiles)) + border_tiles = self._tile_service.mask_tiles_to_polygon(bounding_polygon, border_tiles) + for tile in border_tiles: + tile.update_stats() + tile_means.append(tile.tile_stats.mean) + tile_mins.append(tile.tile_stats.min) + tile_maxes.append(tile.tile_stats.max) + tile_counts.append(tile.tile_stats.count) + + tile_means = np.array(tile_means) + tile_mins = np.array(tile_mins) + tile_maxes = np.array(tile_maxes) + tile_counts = np.array(tile_counts) + + sum_tile_counts = np.sum(tile_counts) * 1.0 + + monthly_averages += [np.average(tile_means, None, tile_counts / sum_tile_counts).item()] + monthly_mins += [np.average(tile_mins, None, tile_counts / sum_tile_counts).item()] + monthly_maxes += [np.average(tile_maxes, None, tile_counts / sum_tile_counts).item()] + monthly_counts += [sum_tile_counts] + + count_sum = np.sum(monthly_counts) * 1.0 + weights = np.array(monthly_counts) / count_sum + + return np.average(monthly_averages, None, weights).item(), \ + np.average(monthly_averages, None, weights).item(), \ + np.average(monthly_averages, None, weights).item() + + @lru_cache() + def get_min_max_date(self, ds=None): + min_date = pytz.timezone('UTC').localize( + datetime.utcfromtimestamp(self._tile_service.get_min_time([], ds=ds))) + max_date = pytz.timezone('UTC').localize( + datetime.utcfromtimestamp(self._tile_service.get_max_time([], ds=ds))) + + return min_date.date(), max_date.date() + + @staticmethod + def calculate_comparison_stats(results): + xy = [[], []] + + for item in results: + if len(item) == 2: + xy[item[0]["ds"]].append(item[0]["mean"]) + xy[item[1]["ds"]].append(item[1]["mean"]) + + slope, intercept, r_value, p_value, std_err = stats.linregress(xy[0], xy[1]) + comparisonStats = { + "slope": slope, + "intercept": intercept, + "r": r_value, + "p": p_value, + "err": std_err + } + + return comparisonStats + + +class TimeSeriesResults(NexusResults): + LINE_PLOT = "line" + SCATTER_PLOT = "scatter" + + __SERIES_COLORS = ['red', 'blue'] + + def toImage(self): + + type = self.computeOptions().get_plot_type() + + if type == TimeSeriesResults.LINE_PLOT or type == "default": + return self.createLinePlot() + elif type == TimeSeriesResults.SCATTER_PLOT: + return self.createScatterPlot() + else: + raise Exception("Invalid or unsupported time series plot specified") + + def createScatterPlot(self): + timeSeries = [] + series0 = [] + series1 = [] + + res = self.results() + meta = self.meta() + + plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "mean" + + for m in res: + if len(m) == 2: + timeSeries.append(datetime.fromtimestamp(m[0]["time"] / 1000)) + series0.append(m[0][plotSeries]) + series1.append(m[1][plotSeries]) + + title = ', '.join(set([m['title'] for m in meta])) + sources = ', '.join(set([m['source'] for m in meta])) + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + fig, ax = plt.subplots() + fig.set_size_inches(11.0, 8.5) + ax.scatter(series0, series1, alpha=0.5) + ax.set_xlabel(meta[0]['units']) + ax.set_ylabel(meta[1]['units']) + ax.set_title("%s\n%s\n%s" % (title, sources, dateRange)) + + par = np.polyfit(series0, series1, 1, full=True) + slope = par[0][0] + intercept = par[0][1] + xl = [min(series0), max(series0)] + yl = [slope * xx + intercept for xx in xl] + plt.plot(xl, yl, '-r') + + # r = self.stats()["r"] + # plt.text(0.5, 0.5, "r = foo") + + ax.grid(True) + fig.tight_layout() + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + def createLinePlot(self): + nseries = len(self.meta()) + res = self.results() + meta = self.meta() + + timeSeries = [datetime.fromtimestamp(m[0]["time"] / 1000) for m in res] + + means = [[np.nan] * len(res) for n in range(0, nseries)] + + plotSeries = self.computeOptions().get_plot_series() if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "mean" + + for n in range(0, len(res)): + timeSlot = res[n] + for seriesValues in timeSlot: + means[seriesValues['ds']][n] = seriesValues[plotSeries] + + x = timeSeries + + fig, axMain = plt.subplots() + fig.set_size_inches(11.0, 8.5) + fig.autofmt_xdate() + + title = ', '.join(set([m['title'] for m in meta])) + sources = ', '.join(set([m['source'] for m in meta])) + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange)) + axMain.set_xlabel('Date') + axMain.grid(True) + axMain.xaxis.set_major_locator(mdates.YearLocator()) + axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y')) + axMain.xaxis.set_minor_locator(mdates.MonthLocator()) + axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d') + + plots = [] + + for n in range(0, nseries): + if n == 0: + ax = axMain + else: + ax = ax.twinx() + + plots += ax.plot(x, means[n], color=self.__SERIES_COLORS[n], zorder=10, linewidth=3, + label=meta[n]['title']) + ax.set_ylabel(meta[n]['units']) + + labs = [l.get_label() for l in plots] + axMain.legend(plots, labs, loc=0) + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999., spark_nparts_needed=1, sc=None): + nexus_tiles_spark = [(bounding_polygon.wkt, ds, + list(daysinrange_part), fill) + for daysinrange_part + in np.array_split(daysinrange, + spark_nparts_needed)] + + # Launch Spark computations + rdd = sc.parallelize(nexus_tiles_spark, spark_nparts_needed) + results = rdd.map(calc_average_on_day).collect() + results = list(itertools.chain.from_iterable(results)) + results = sorted(results, key=lambda entry: entry["time"]) + + return results, {} + + +def calc_average_on_day(tile_in_spark): + import shapely.wkt + (bounding_wkt, dataset, timestamps, fill) = tile_in_spark + if len(timestamps) == 0: + return [] + tile_service = NexusTileService() + ds1_nexus_tiles = \ + tile_service.get_tiles_bounded_by_polygon(shapely.wkt.loads(bounding_wkt), + dataset, + timestamps[0], + timestamps[-1], + rows=5000) + + tile_dict = {} + for timeinseconds in timestamps: + tile_dict[timeinseconds] = [] + + for i in range(len(ds1_nexus_tiles)): + tile = ds1_nexus_tiles[i] + tile_dict[tile.times[0]].append(i) + + stats_arr = [] + for timeinseconds in timestamps: + cur_tile_list = tile_dict[timeinseconds] + if len(cur_tile_list) == 0: + continue + tile_data_agg = \ + np.ma.array(data=np.hstack([ds1_nexus_tiles[i].data.data.flatten() + for i in cur_tile_list + if (ds1_nexus_tiles[i].times[0] == + timeinseconds)]), + mask=np.hstack([ds1_nexus_tiles[i].data.mask.flatten() + for i in cur_tile_list + if (ds1_nexus_tiles[i].times[0] == + timeinseconds)])) + lats_agg = np.hstack([np.repeat(ds1_nexus_tiles[i].latitudes, + len(ds1_nexus_tiles[i].longitudes)) + for i in cur_tile_list + if (ds1_nexus_tiles[i].times[0] == + timeinseconds)]) + if (len(tile_data_agg) == 0) or tile_data_agg.mask.all(): + continue + else: + data_min = np.ma.min(tile_data_agg) + data_max = np.ma.max(tile_data_agg) + daily_mean = \ + np.ma.average(tile_data_agg, + weights=np.cos(np.radians(lats_agg))).item() + data_count = np.ma.count(tile_data_agg) + data_std = np.ma.std(tile_data_agg) + + # Return Stats by day + stat = { + 'min': data_min, + 'max': data_max, + 'mean': daily_mean, + 'cnt': data_count, + 'std': data_std, + 'time': int(timeinseconds) + } + stats_arr.append(stat) + return stats_arr
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/__init__.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms_spark/__init__.py b/analysis/webservice/algorithms_spark/__init__.py new file mode 100644 index 0000000..51727dc --- /dev/null +++ b/analysis/webservice/algorithms_spark/__init__.py @@ -0,0 +1,58 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import os +import logging + +log = logging.getLogger(__name__) + + +def module_exists(module_name): + try: + __import__(module_name) + except ImportError: + return False + else: + return True + + +if module_exists("pyspark"): + try: + import CorrMapSpark + except ImportError: + pass + + try: + import Matchup + except ImportError: + pass + + try: + import TimeAvgMapSpark + except ImportError: + pass + + try: + import TimeSeriesSpark + except ImportError: + pass + + try: + import ClimMapSpark + except ImportError: + pass + + try: + import DailyDifferenceAverageSpark + except ImportError: + pass + + try: + import HofMoellerSpark + except ImportError: + pass + + +else: + log.warn("pyspark not found. Skipping algorithms in %s" % os.path.dirname(__file__)) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/config/algorithms.ini ---------------------------------------------------------------------- diff --git a/analysis/webservice/config/algorithms.ini b/analysis/webservice/config/algorithms.ini new file mode 100644 index 0000000..fae7ae5 --- /dev/null +++ b/analysis/webservice/config/algorithms.ini @@ -0,0 +1,5 @@ +[multiprocessing] +maxprocesses=8 + +[spark] +maxconcurrentjobs=10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/config/web.ini ---------------------------------------------------------------------- diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini new file mode 100644 index 0000000..ed9bbb7 --- /dev/null +++ b/analysis/webservice/config/web.ini @@ -0,0 +1,11 @@ +[global] +server.socket_port=8083 +server.socket_host = '127.0.0.1' +server.max_simultaneous_requests = 10 + +[static] +static_enabled=true +static_dir=static + +[modules] +module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/matserver.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/matserver.py b/analysis/webservice/matserver.py new file mode 100644 index 0000000..5cdd8a2 --- /dev/null +++ b/analysis/webservice/matserver.py @@ -0,0 +1,30 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import matplotlib.pyplot as plt +import numpy as np + +import mpld3 +from mpld3 import plugins + +fig, ax = plt.subplots() + +x = np.linspace(-2, 2, 20) +y = x[:, None] +X = np.zeros((20, 20, 4)) + +X[:, :, 0] = np.exp(- (x - 1) ** 2 - (y) ** 2) +X[:, :, 1] = np.exp(- (x + 0.71) ** 2 - (y - 0.71) ** 2) +X[:, :, 2] = np.exp(- (x + 0.71) ** 2 - (y + 0.71) ** 2) +X[:, :, 3] = np.exp(-0.25 * (x ** 2 + y ** 2)) + +im = ax.imshow(X, extent=(10, 20, 10, 20), + origin='lower', zorder=1, interpolation='nearest') +fig.colorbar(im, ax=ax) + +ax.set_title('An Image', size=20) + +plugins.connect(fig, plugins.MousePosition(fontsize=14)) + +mpld3.show() http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/plotting.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/plotting.py b/analysis/webservice/plotting.py new file mode 100644 index 0000000..041c597 --- /dev/null +++ b/analysis/webservice/plotting.py @@ -0,0 +1,552 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import pyximport; + +pyximport.install() + +import os +import numpy as np +import matplotlib.pyplot as plt +import ConfigParser +import matplotlib.dates as mdates +from matplotlib import cm +import pickle +from cStringIO import StringIO +import datetime +from matplotlib.ticker import FuncFormatter +from matplotlib.colors import LightSource +from mpl_toolkits.basemap import Basemap +import mpld3 + + +# TODO needs to be updated to use nexustiles + +def __writeAndShowImageData(data, delete=True): + f = open("foo.png", "wb") + f.write(data) + f.close() + + plt.clf() + image = plt.imread("foo.png") + plt.imshow(image) + plt.axis('off') # clear x- and y-axes + plt.show() + if delete: + os.unlink("foo.png") + + +def createLatLonTimeAverageMapGlobe(res, meta, startTime=None, endTime=None): + latSeries = [m[0]['lat'] for m in res] + lonSeries = [m['lon'] for m in res[0]][:] + data = np.zeros((len(lonSeries), len(latSeries))) + for t in range(0, len(latSeries)): + latSet = res[t] + for l in range(0, len(lonSeries)): + data[l][t] = latSet[l]['avg'] + data[data == 0.0] = np.nan + # data = np.rot90(data, 3) + lats, lons = np.meshgrid(latSeries, lonSeries) + masked_array = np.ma.array(data, mask=np.isnan(data)) + z = masked_array + + fig = plt.figure() + fig.set_size_inches(11.0, 8.5) + ax = fig.add_axes([0.05, 0.05, 0.9, 0.9]) + + m = Basemap(projection='ortho', lat_0=20, lon_0=-100, resolution='l') + # m.drawmapboundary(fill_color='0.3') + im1 = m.pcolormesh(lons, lats, z, shading='flat', cmap=plt.cm.jet, latlon=True) + m.drawparallels(np.arange(-90., 99., 30.)) + m.drawmeridians(np.arange(-180., 180., 60.)) + # m.drawcoastlines() + # m.drawcountries() + cb = m.colorbar(im1, "bottom", size="5%", pad="2%") + + title = meta['title'] + source = meta['source'] + if startTime is not None and endTime is not None: + if type(startTime) is not datetime.datetime: + startTime = datetime.datetime.fromtimestamp(startTime / 1000) + if type(endTime) is not datetime.datetime: + endTime = datetime.datetime.fromtimestamp(endTime / 1000) + dateRange = "%s - %s" % (startTime.strftime('%b %Y'), endTime.strftime('%b %Y')) + else: + dateRange = "" + + ax.set_title("%s\n%s\n%s" % (title, source, dateRange)) + ax.set_ylabel('Latitude') + ax.set_xlabel('Longitude') + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +def createLatLonTimeAverageMapProjected(res, meta, startTime=None, endTime=None): + latSeries = [m[0]['lat'] for m in res] + lonSeries = [m['lon'] for m in res[0]][:] + data = np.zeros((len(lonSeries), len(latSeries))) + for t in range(0, len(latSeries)): + latSet = res[t] + for l in range(0, len(lonSeries)): + data[l][t] = latSet[l]['avg'] + data[data == 0.0] = np.nan + # data = np.rot90(data, 3) + lats, lons = np.meshgrid(latSeries, lonSeries) + masked_array = np.ma.array(data, mask=np.isnan(data)) + z = masked_array + + fig = plt.figure() + fig.set_size_inches(11.0, 8.5) + ax = fig.add_axes([0.05, 0.05, 0.9, 0.9]) + + m = Basemap(projection='kav7', lon_0=0, resolution=None) + # m.drawmapboundary(fill_color='0.3') + im1 = m.pcolormesh(lons, lats, z, shading='gouraud', cmap=plt.cm.jet, latlon=True) + m.drawparallels(np.arange(-90., 99., 30.)) + m.drawmeridians(np.arange(-180., 180., 60.)) + # m.drawcoastlines() + # m.drawcountries() + cb = m.colorbar(im1, "bottom", size="5%", pad="2%") + + title = meta['title'] + source = meta['source'] + if startTime is not None and endTime is not None: + if type(startTime) is not datetime.datetime: + startTime = datetime.datetime.fromtimestamp(startTime / 1000) + if type(endTime) is not datetime.datetime: + endTime = datetime.datetime.fromtimestamp(endTime / 1000) + dateRange = "%s - %s" % (startTime.strftime('%b %Y'), endTime.strftime('%b %Y')) + else: + dateRange = "" + + ax.set_title("%s\n%s\n%s" % (title, source, dateRange)) + ax.set_ylabel('Latitude') + ax.set_xlabel('Longitude') + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +def createLatLonTimeAverageMap3d(res, meta, startTime=None, endTime=None): + latSeries = [m[0]['lat'] for m in res][::-1] + lonSeries = [m['lon'] for m in res[0]] + data = np.zeros((len(latSeries), len(lonSeries))) + for t in range(0, len(latSeries)): + latSet = res[t] + for l in range(0, len(lonSeries)): + data[len(latSeries) - t - 1][l] = latSet[l]['avg'] + data[data == 0.0] = np.nan + + x, y = np.meshgrid(latSeries, lonSeries) + z = data + + region = np.s_[0:178, 0:178] + x, y, z = x[region], y[region], z[region] + + fig, ax = plt.subplots(subplot_kw=dict(projection='3d')) + + ls = LightSource(270, 45) + masked_array = np.ma.array(z, mask=np.isnan(z)) + rgb = ls.shade(masked_array, cmap=cm.gist_earth) # , vert_exag=0.1, blend_mode='soft') + surf = ax.plot_surface(x, y, masked_array, rstride=1, cstride=1, facecolors=rgb, + linewidth=0, antialiased=False, shade=False) + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +def createLatLonTimeAverageMap(res, meta, startTime=None, endTime=None): + latSeries = [m[0]['lat'] for m in res][::-1] + lonSeries = [m['lon'] for m in res[0]] + + data = np.zeros((len(latSeries), len(lonSeries))) + + for t in range(0, len(latSeries)): + latSet = res[t] + for l in range(0, len(lonSeries)): + data[len(latSeries) - t - 1][l] = latSet[l]['avg'] + + def yFormatter(y, pos): + if y < len(latSeries): + return '%s $^\circ$' % (int(latSeries[int(y)] * 100.0) / 100.) + else: + return "" + + def xFormatter(x, pos): + if x < len(lonSeries): + return "%s $^\circ$" % (int(lonSeries[int(x)] * 100.0) / 100.) + else: + return "" + + data[data == 0.0] = np.nan + fig, ax = plt.subplots() + fig.set_size_inches(11.0, 8.5) + + cmap = cm.coolwarm + ls = LightSource(315, 45) + masked_array = np.ma.array(data, mask=np.isnan(data)) + rgb = ls.shade(masked_array, cmap) + + cax = ax.imshow(rgb, interpolation='nearest', cmap=cmap) + + ax.yaxis.set_major_formatter(FuncFormatter(yFormatter)) + ax.xaxis.set_major_formatter(FuncFormatter(xFormatter)) + + title = meta['title'] + source = meta['source'] + if startTime is not None and endTime is not None: + if type(startTime) is not datetime.datetime: + startTime = datetime.datetime.fromtimestamp(startTime / 1000) + if type(endTime) is not datetime.datetime: + endTime = datetime.datetime.fromtimestamp(endTime / 1000) + dateRange = "%s - %s" % (startTime.strftime('%b %Y'), endTime.strftime('%b %Y')) + else: + dateRange = "" + + ax.set_title("%s\n%s\n%s" % (title, source, dateRange)) + ax.set_ylabel('Latitude') + ax.set_xlabel('Longitude') + + fig.colorbar(cax) + fig.autofmt_xdate() + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +def __createLatLonTimeAverageMapTest(ds, minLat, minLon, maxLat, maxLon, startTime, endTime, mask, create=False, + delete=True, projection='flat'): + if create: + config = ConfigParser.RawConfigParser() + config.read("sci.ini") + + sci = Sci(config) + + res, meta = sci.getLongitudeLatitudeMapStatsForBox(minLat, + maxLat, + minLon, + maxLon, + ds, + startTime, + endTime, + mask) + + output = open('data_latlon_time_avg.pkl', 'wb') + pickle.dump((res, meta), output) + output.close() + + pkl_file = open('data_latlon_time_avg.pkl', 'rb') + data1 = pickle.load(pkl_file) + pkl_file.close() + res, meta = data1 + if projection == 'flat': + bytes = createLatLonTimeAverageMap(res, meta, startTime, endTime) + elif projection == '3d': + bytes = createLatLonTimeAverageMap3d(res, meta, startTime, endTime) + elif projection == 'projected': + bytes = createLatLonTimeAverageMapProjected(res, meta, startTime, endTime) + elif projection == 'globe': + bytes = createLatLonTimeAverageMapGlobe(res, meta, startTime, endTime) + __writeAndShowImageData(bytes, delete=delete) + + +def createHoffmueller(data, coordSeries, timeSeries, coordName, title, interpolate='nearest'): + cmap = cm.coolwarm + # ls = LightSource(315, 45) + # rgb = ls.shade(data, cmap) + + fig, ax = plt.subplots() + fig.set_size_inches(11.0, 8.5) + cax = ax.imshow(data, interpolation=interpolate, cmap=cmap) + + def yFormatter(y, pos): + if y < len(coordSeries): + return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.) + else: + return "" + + def xFormatter(x, pos): + if x < len(timeSeries): + return timeSeries[int(x)].strftime('%b %Y') + else: + return "" + + ax.xaxis.set_major_formatter(FuncFormatter(xFormatter)) + ax.yaxis.set_major_formatter(FuncFormatter(yFormatter)) + + ax.set_title(title) + ax.set_ylabel(coordName) + ax.set_xlabel('Date') + + fig.colorbar(cax) + fig.autofmt_xdate() + + labels = ['point {0}'.format(i + 1) for i in range(len(data))] + # plugins.connect(fig, plugins.MousePosition(fontsize=14)) + tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels) + mpld3.plugins.connect(fig, tooltip) + mpld3.show() + # sio = StringIO() + # plt.savefig(sio, format='png') + # return sio.getvalue() + + +def createLongitudeHoffmueller(res, meta): + lonSeries = [m['longitude'] for m in res[0]['lons']] + timeSeries = [datetime.datetime.fromtimestamp(m['time'] / 1000) for m in res] + + data = np.zeros((len(lonSeries), len(timeSeries))) + + for t in range(0, len(timeSeries)): + timeSet = res[t] + for l in range(0, len(lonSeries)): + latSet = timeSet['lons'][l] + value = latSet['avg'] + data[len(lonSeries) - l - 1][t] = value + + title = meta['title'] + source = meta['source'] + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + return createHoffmueller(data, lonSeries, timeSeries, "Longitude", "%s\n%s\n%s" % (title, source, dateRange), + interpolate='nearest') + + +def createLongitudeHoffmuellerTest(ds, minLat, minLon, maxLat, maxLon, startTime, endTime, mask, create=False, + delete=True): + if create: + config = ConfigParser.RawConfigParser() + config.read("sci.ini") + + sci = Sci(config) + + res, meta = sci.getLongitudeTimeHofMoellerStatsForBox(minLat, + maxLat, + minLon, + maxLon, + ds, + startTime, + endTime, + mask) + + output = open('data_lon_hoff.pkl', 'wb') + pickle.dump((res, meta), output) + output.close() + + pkl_file = open('data_lon_hoff.pkl', 'rb') + data1 = pickle.load(pkl_file) + pkl_file.close() + res, meta = data1 + bytes = createLongitudeHoffmueller(res, meta) + __writeAndShowImageData(bytes, delete=delete) + + +def createLatitudeHoffmueller(res, meta): + latSeries = [m['latitude'] for m in res[0]['lats']] + timeSeries = [datetime.datetime.fromtimestamp(m['time'] / 1000) for m in res] + + data = np.zeros((len(latSeries), len(timeSeries))) + + for t in range(0, len(timeSeries)): + timeSet = res[t] + for l in range(0, len(latSeries)): + latSet = timeSet['lats'][l] + value = latSet['avg'] + data[len(latSeries) - l - 1][t] = value + + title = meta['title'] + source = meta['source'] + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + return createHoffmueller(data, latSeries, timeSeries, "Latitude", title="%s\n%s\n%s" % (title, source, dateRange), + interpolate='nearest') + + +def createLatitudeHoffmuellerTest(ds, minLat, minLon, maxLat, maxLon, startTime, endTime, mask, create=False, + delete=True): + if create: + config = ConfigParser.RawConfigParser() + config.read("sci.ini") + + sci = Sci(config) + + res, meta = sci.getLatitudeTimeHofMoellerStatsForBox(minLat, + maxLat, + minLon, + maxLon, + ds, + startTime, + endTime, + mask) + + output = open('data_lat_hoff.pkl', 'wb') + pickle.dump((res, meta), output) + output.close() + + pkl_file = open('data_lat_hoff.pkl', 'rb') + data1 = pickle.load(pkl_file) + pkl_file.close() + res, meta = data1 + bytes = createLatitudeHoffmueller(res, meta) + # __writeAndShowImageData(bytes, delete=delete) + + +SERIES_COLORS = ['red', 'blue'] + + +def createTimeSeries(res, meta, nseries=1): + # maxSeries = [m[0]['maxFiltered'] for m in res] + # minSeries = [m[0]['minFiltered'] for m in res] + # mean = [m[0]["meanFiltered"] for m in res] + # mean1 = [m[1]["meanFiltered"] for m in res] + # stdSeries = [m[0]['std'] for m in res] + + timeSeries = [datetime.datetime.fromtimestamp(m[0]["time"] / 1000) for m in res] + + means = [[np.nan] * len(res) for n in range(0, nseries)] + + for n in range(0, len(res)): + timeSlot = res[n] + for seriesValues in timeSlot: + means[seriesValues['ds']][n] = seriesValues['mean'] + + x = timeSeries + + fig, axMain = plt.subplots() + fig.set_size_inches(11.0, 8.5) + fig.autofmt_xdate() + + title = ', '.join(set([m['title'] for m in meta])) + sources = ', '.join(set([m['source'] for m in meta])) + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange)) + axMain.set_xlabel('Date') + axMain.grid(True) + axMain.xaxis.set_major_locator(mdates.YearLocator()) + axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y')) + axMain.xaxis.set_minor_locator(mdates.MonthLocator()) + axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d') + + plots = [] + + for n in range(0, nseries): + if n == 0: + ax = axMain + else: + ax = ax.twinx() + + plots += ax.plot(x, means[n], color=SERIES_COLORS[n], zorder=10, linewidth=3, label=meta[n]['title']) + ax.set_ylabel(meta[n]['units']) + + labs = [l.get_label() for l in plots] + axMain.legend(plots, labs, loc=0) + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +def createScatterPlot(res, meta): + timeSeries = [] + series0 = [] + series1 = [] + + for m in res: + if len(m) == 2: + timeSeries.append(datetime.datetime.fromtimestamp(m[0]["time"] / 1000)) + series0.append(m[0]["mean"]) + series1.append(m[1]["mean"]) + + title = ', '.join(set([m['title'] for m in meta])) + sources = ', '.join(set([m['source'] for m in meta])) + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + print len(timeSeries), len(series0), len(series1) + fig, ax = plt.subplots() + fig.set_size_inches(11.0, 8.5) + ax.scatter(series0, series1, alpha=0.5) + ax.set_xlabel(meta[0]['units']) + ax.set_ylabel(meta[1]['units']) + ax.set_title("%s\n%s\n%s" % (title, sources, dateRange)) + + par = np.polyfit(series0, series1, 1, full=True) + slope = par[0][0] + intercept = par[0][1] + xl = [min(series0), max(series0)] + yl = [slope * xx + intercept for xx in xl] + plt.plot(xl, yl, '-r') + + ax.grid(True) + fig.tight_layout() + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +def createTimeSeriesTest(ds, minLat, minLon, maxLat, maxLon, startTime, endTime, mask, create=False, delete=True): + if type(ds) != list and type(ds) != tuple: + ds = (ds,) + + if create: + config = ConfigParser.RawConfigParser() + config.read("sci.ini") + sci = Sci(config) + res, meta = sci.getTimeSeriesStatsForBox(minLat, + maxLat, + minLon, + maxLon, + ds, + startTime, + endTime, + mask, + True, + True) + + output = open('data_sc_lp.pkl', 'wb') + pickle.dump((res, meta), output) + output.close() + + pkl_file = open('data_sc_lp.pkl', 'rb') + data1 = pickle.load(pkl_file) + pkl_file.close() + res, meta = data1 + bytes = createScatterPlot(res, meta) + # bytes = createTimeSeries(res, meta, nseries=len(ds)) + __writeAndShowImageData(bytes, delete=delete) + + +if __name__ == "__main__": + ds = "NCDC-L4LRblend-GLOB-AVHRR_OI" + + # minLat=21.53978071813801 + # minLon=-25.20013561141637 + # maxLat=48.57009377619356 + # maxLon=8.446972830642368 + + minLat = -2.67487472970339 + minLon = -78.13449868344182 + maxLat = 65.46403943747828 + maxLon = 4.6458350568532865 + + # minLat = -89 + # maxLat = 89 + # minLon = -179 + # maxLon = 179 + + startTime = 1201939200000 + endTime = 1328169600000 + mask = 3 + + ds = (ds, "SSH_alti_1deg_1mon") + createTimeSeriesTest(ds, minLat, minLon, maxLat, maxLon, startTime, endTime, mask, create=False, delete=False) + + # createLatitudeHoffmuellerTest(ds, minLat, minLon, maxLat, maxLon, startTime, endTime, mask, create=False, delete=False) + # createLongitudeHoffmuellerTest(ds, minLat, minLon, maxLat, maxLon, startTime, endTime, mask, create=False, delete=False) + # __createLatLonTimeAverageMapTest(ds, minLat, minLon, maxLat, maxLon, startTime, endTime, mask, create=False, delete=False, projection='projected') http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/webapp.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py new file mode 100644 index 0000000..121624a --- /dev/null +++ b/analysis/webservice/webapp.py @@ -0,0 +1,230 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import ConfigParser +import importlib +import json +import logging +import sys, os +import traceback +from multiprocessing.pool import ThreadPool + +import matplotlib +import pkg_resources +import tornado.web +from tornado.options import define, options, parse_command_line + +from webservice import NexusHandler +from webservice.webmodel import NexusRequestObject, NexusProcessingException + +matplotlib.use('Agg') + + +class ContentTypes(object): + CSV = "CSV" + JSON = "JSON" + XML = "XML" + PNG = "PNG" + NETCDF = "NETCDF" + ZIP = "ZIP" + + +class BaseHandler(tornado.web.RequestHandler): + path = r"/" + + def initialize(self, thread_pool): + self.logger = logging.getLogger('nexus') + self.request_thread_pool = thread_pool + + @tornado.web.asynchronous + def get(self): + + self.logger.info("Received request %s" % self._request_summary()) + self.request_thread_pool.apply_async(self.run) + + def run(self): + self.set_header("Access-Control-Allow-Origin", "*") + reqObject = NexusRequestObject(self) + try: + result = self.do_get(reqObject) + self.async_callback(result) + except NexusProcessingException as e: + self.async_onerror_callback(e.reason, e.code) + except Exception as e: + self.async_onerror_callback(str(e), 500) + + def async_onerror_callback(self, reason, code=500): + self.logger.error("Error processing request", exc_info=True) + + self.set_header("Content-Type", "application/json") + self.set_status(code) + + response = { + "error": reason, + "code": code + } + + self.write(json.dumps(response, indent=5)) + self.finish() + + def async_callback(self, result): + self.finish() + + ''' Override me for standard handlers! ''' + + def do_get(self, reqObject): + pass + + +class ModularNexusHandlerWrapper(BaseHandler): + def initialize(self, thread_pool, clazz=None, algorithm_config=None, sc=None): + BaseHandler.initialize(self, thread_pool) + self.__algorithm_config = algorithm_config + self.__clazz = clazz + self.__sc = sc + + def do_get(self, request): + instance = self.__clazz.instance(algorithm_config=self.__algorithm_config, sc=self.__sc) + + results = instance.calc(request) + + try: + self.set_status(results.status_code) + except AttributeError: + pass + + if request.get_content_type() == ContentTypes.JSON: + self.set_header("Content-Type", "application/json") + try: + self.write(results.toJson()) + except AttributeError: + traceback.print_exc(file=sys.stdout) + self.write(json.dumps(results, indent=4)) + elif request.get_content_type() == ContentTypes.PNG: + self.set_header("Content-Type", "image/png") + try: + self.write(results.toImage()) + except AttributeError: + traceback.print_exc(file=sys.stdout) + raise NexusProcessingException(reason="Unable to convert results to an Image.") + elif request.get_content_type() == ContentTypes.CSV: + self.set_header("Content-Type", "text/csv") + self.set_header("Content-Disposition", "filename=\"%s\"" % request.get_argument('filename', "download.csv")) + try: + self.write(results.toCSV()) + except: + traceback.print_exc(file=sys.stdout) + raise NexusProcessingException(reason="Unable to convert results to CSV.") + elif request.get_content_type() == ContentTypes.NETCDF: + self.set_header("Content-Type", "application/x-netcdf") + self.set_header("Content-Disposition", "filename=\"%s\"" % request.get_argument('filename', "download.nc")) + try: + self.write(results.toNetCDF()) + except: + traceback.print_exc(file=sys.stdout) + raise NexusProcessingException(reason="Unable to convert results to NetCDF.") + elif request.get_content_type() == ContentTypes.ZIP: + self.set_header("Content-Type", "application/zip") + self.set_header("Content-Disposition", "filename=\"%s\"" % request.get_argument('filename', "download.zip")) + try: + self.write(results.toZip()) + except: + traceback.print_exc(file=sys.stdout) + raise NexusProcessingException(reason="Unable to convert results to Zip.") + + return results + + def async_callback(self, result): + super(ModularNexusHandlerWrapper, self).async_callback(result) + if hasattr(result, 'cleanup'): + result.cleanup() + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout) + + log = logging.getLogger(__name__) + + webconfig = ConfigParser.RawConfigParser() + webconfig.readfp(pkg_resources.resource_stream(__name__, "config/web.ini"), filename='web.ini') + + algorithm_config = ConfigParser.RawConfigParser() + algorithm_config.readfp(pkg_resources.resource_stream(__name__, "config/algorithms.ini"), filename='algorithms.ini') + + define("debug", default=False, help="run in debug mode") + define("port", default=webconfig.get("global", "server.socket_port"), help="run on the given port", type=int) + define("address", default=webconfig.get("global", "server.socket_host"), help="Bind to the given address") + parse_command_line() + + moduleDirs = webconfig.get("modules", "module_dirs").split(",") + for moduleDir in moduleDirs: + log.info("Loading modules from %s" % moduleDir) + importlib.import_module(moduleDir) + + staticDir = webconfig.get("static", "static_dir") + staticEnabled = webconfig.get("static", "static_enabled") == "true" + + log.info("Initializing on host address '%s'" % options.address) + log.info("Initializing on port '%s'" % options.port) + log.info("Starting web server in debug mode: %s" % options.debug) + if staticEnabled: + log.info("Using static root path '%s'" % staticDir) + else: + log.info("Static resources disabled") + + handlers = [] + + log.info("Running Nexus Initializers") + NexusHandler.executeInitializers(algorithm_config) + + max_request_threads = webconfig.getint("global", "server.max_simultaneous_requests") + log.info("Initializing request ThreadPool to %s" % max_request_threads) + request_thread_pool = ThreadPool(processes=max_request_threads) + + spark_context = None + for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS: + if issubclass(clazzWrapper.clazz(), NexusHandler.SparkHandler): + if spark_context is None: + from pyspark import SparkContext, SparkConf + + # Configure Spark + sp_conf = SparkConf() + sp_conf.setAppName("nexus-analysis") + sp_conf.set("spark.scheduler.mode", "FAIR") + sp_conf.set("spark.executor.memory", "6g") + spark_context = SparkContext(conf=sp_conf) + + handlers.append( + (clazzWrapper.path(), ModularNexusHandlerWrapper, + dict(clazz=clazzWrapper, algorithm_config=algorithm_config, sc=spark_context, + thread_pool=request_thread_pool))) + else: + handlers.append( + (clazzWrapper.path(), ModularNexusHandlerWrapper, + dict(clazz=clazzWrapper, algorithm_config=algorithm_config, thread_pool=request_thread_pool))) + + + class VersionHandler(tornado.web.RequestHandler): + def get(self): + self.write(pkg_resources.get_distribution("nexusanalysis").version) + + + handlers.append((r"/version", VersionHandler)) + + if staticEnabled: + handlers.append( + (r'/(.*)', tornado.web.StaticFileHandler, {'path': staticDir, "default_filename": "index.html"})) + + app = tornado.web.Application( + handlers, + default_host=options.address, + debug=options.debug + ) + app.listen(options.port) + + log.info("Starting HTTP listener...") + tornado.ioloop.IOLoop.current().start() http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/webmodel.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/webmodel.py b/analysis/webservice/webmodel.py new file mode 100644 index 0000000..70621af --- /dev/null +++ b/analysis/webservice/webmodel.py @@ -0,0 +1,519 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import re +import json +import numpy as np +from shapely.geometry import Polygon +from datetime import datetime +from decimal import Decimal +import time +import inspect +import hashlib +from pytz import UTC, timezone + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) + +class RequestParameters(object): + SEASONAL_CYCLE_FILTER = "seasonalFilter" + MAX_LAT = "maxLat" + MIN_LAT = "minLat" + MAX_LON = "maxLon" + MIN_LON = "minLon" + DATASET = "ds" + ENVIRONMENT = "env" + OUTPUT = "output" + START_TIME = "startTime" + END_TIME = "endTime" + START_YEAR = "startYear" + END_YEAR = "endYear" + CLIM_MONTH = "month" + START_ROW = "start" + ROW_COUNT = "numRows" + APPLY_LOW_PASS = "lowPassFilter" + LOW_CUT = "lowCut" + ORDER = "lpOrder" + PLOT_SERIES = "plotSeries" + PLOT_TYPE = "plotType" + SPARK_CFG = "spark" + +class StandardNexusErrors: + UNKNOWN = 1000 + NO_DATA = 1001 + DATASET_MISSING = 1002 + + +class NexusProcessingException(Exception): + def __init__(self, error=StandardNexusErrors.UNKNOWN, reason="", code=500): + self.error = error + self.reason = reason + self.code = code + Exception.__init__(self, reason) + + +class NoDataException(NexusProcessingException): + def __init__(self, reason="No data found for the selected timeframe"): + NexusProcessingException.__init__(self, StandardNexusErrors.NO_DATA, reason, 400) + + +class DatasetNotFoundException(NexusProcessingException): + def __init__(self, reason="Dataset not found"): + NexusProcessingException.__init__(self, StandardNexusErrors.DATASET_MISSING, reason, code=404) + + +class SparkConfig(object): + MAX_NUM_EXECS = 64 + MAX_NUM_PARTS = 8192 + DEFAULT = "local,1,1" + +class StatsComputeOptions(object): + def __init__(self): + pass + + def get_apply_seasonal_cycle_filter(self, default="false"): + raise Exception("Please implement") + + def get_max_lat(self, default=90.0): + raise Exception("Please implement") + + def get_min_lat(self, default=-90.0): + raise Exception("Please implement") + + def get_max_lon(self, default=180): + raise Exception("Please implement") + + def get_min_lon(self, default=-180): + raise Exception("Please implement") + + def get_dataset(self): + raise Exception("Please implement") + + def get_environment(self): + raise Exception("Please implement") + + def get_start_time(self): + raise Exception("Please implement") + + def get_end_time(self): + raise Exception("Please implement") + + def get_start_year(self): + raise Exception("Please implement") + + def get_end_year(self): + raise Exception("Please implement") + + def get_clim_month(self): + raise Exception("Please implement") + + def get_start_row(self): + raise Exception("Please implement") + + def get_end_row(self): + raise Exception("Please implement") + + def get_content_type(self): + raise Exception("Please implement") + + def get_apply_low_pass_filter(self, default=False): + raise Exception("Please implement") + + def get_low_pass_low_cut(self, default=12): + raise Exception("Please implement") + + def get_low_pass_order(self, default=9): + raise Exception("Please implement") + + def get_plot_series(self, default="mean"): + raise Exception("Please implement") + + def get_plot_type(self, default="default"): + raise Exception("Please implement") + + def get_spark_cfg (self, default=SparkConfig.DEFAULT): + raise Exception("Please implement") + + +class NexusRequestObject(StatsComputeOptions): + shortNamePattern = re.compile("^[a-zA-Z0-9_\-,\.]+$") + floatingPointPattern = re.compile('[+-]?(\d+(\.\d*)?|\.\d+)([eE][+-]?\d+)?') + + def __init__(self, reqHandler): + if reqHandler is None: + raise Exception("Request handler cannot be null") + self.requestHandler = reqHandler + StatsComputeOptions.__init__(self) + + def get_argument(self, name, default=None): + return self.requestHandler.get_argument(name, default=default) + + def get_list_int_arg(self, name, default=None): + arg = self.get_argument(name, default=default) + return arg.split(',') + + def __validate_is_shortname(self, v): + if v is None or len(v) == 0: + return False + return self.shortNamePattern.match(v) is not None + + def __validate_is_number(self, v): + if v is None or (type(v) == str and len(v) == 0): + return False + elif type(v) == int or type(v) == float: + return True + else: + return self.floatingPointPattern.match(v) is not None + + def get_float_arg(self, name, default=0.0): + arg = self.get_argument(name, default) + if self.__validate_is_number(arg): + return float(arg) + else: + return default + + def get_decimal_arg(self, name, default=0.0): + arg = self.get_argument(name, default) + if self.__validate_is_number(arg): + return Decimal(arg) + else: + if default is None: + return None + return Decimal(default) + + def get_int_arg(self, name, default=0): + arg = self.get_argument(name, default) + if self.__validate_is_number(arg): + return int(arg) + else: + return default + + def get_boolean_arg(self, name, default=False): + arg = self.get_argument(name, "false" if not default else "true") + return arg is not None and arg in ['true', '1', 't', 'y', 'yes', 'True', 'T', 'Y', + 'Yes', True] + + def get_datetime_arg(self, name, default=None): + time_str = self.get_argument(name, default=default) + if time_str == default: + return default + try: + dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) + except ValueError: + dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC) + return dt + + def get_apply_seasonal_cycle_filter(self, default=True): + return self.get_boolean_arg(RequestParameters.SEASONAL_CYCLE_FILTER, default=default) + + def get_max_lat(self, default=Decimal(90)): + return self.get_decimal_arg("maxLat", default) + + def get_min_lat(self, default=Decimal(-90)): + return self.get_decimal_arg("minLat", default) + + def get_max_lon(self, default=Decimal(180)): + return self.get_decimal_arg("maxLon", default) + + def get_min_lon(self, default=Decimal(-180)): + return self.get_decimal_arg("minLon", default) + + def get_bounding_polygon(self): + west, south, east, north = [float(b) for b in self.get_argument("b").split(",")] + polygon = Polygon([(west, south), (east, south), (east, north), (west, north), (west, south)]) + return polygon + + def get_dataset(self): + ds = self.get_argument(RequestParameters.DATASET, None) + if ds is not None and not self.__validate_is_shortname(ds): + raise Exception("Invalid shortname") + else: + return ds.split(",") + + def get_environment(self): + env = self.get_argument(RequestParameters.ENVIRONMENT, None) + if env is None and "Origin" in self.requestHandler.request.headers: + origin = self.requestHandler.request.headers["Origin"] + if origin == "http://localhost:63342": + env = "DEV" + if origin == "https://sealevel.uat.earthdata.nasa.gov": + env = "UAT" + elif origin == "https://sealevel.sit.earthdata.nasa.gov": + env = "SIT" + elif origin == "https://sealevel.earthdata.nasa.gov": + env = "PROD" + + if env not in ("DEV", "SIT", "UAT", "PROD", None): + raise Exception("Invalid Environment") + else: + return env + + def get_start_time(self): + return self.get_int_arg(RequestParameters.START_TIME, 0) + + def get_end_time(self): + return self.get_int_arg(RequestParameters.END_TIME, -1) + + def get_start_year(self): + return self.get_int_arg(RequestParameters.START_YEAR, 0) + + def get_end_year(self): + return self.get_int_arg(RequestParameters.END_YEAR, -1) + + def get_clim_month(self): + return self.get_int_arg(RequestParameters.CLIM_MONTH, -1) + + def get_start_datetime(self): + time_str = self.get_argument(RequestParameters.START_TIME) + try: + dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) + except ValueError: + dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC) + return dt + + def get_end_datetime(self): + time_str = self.get_argument(RequestParameters.END_TIME) + try: + dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) + except ValueError: + dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC) + return dt + + def get_start_datetime_ms(self): + time_str = self.get_argument(RequestParameters.START_TIME) + try: + dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) + except ValueError: + dt = datetime.utcfromtimestamp(int(time_str)/1000).replace(tzinfo=UTC) + return dt + + def get_end_datetime_ms(self): + time_str = self.get_argument(RequestParameters.END_TIME) + try: + dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) + except ValueError: + dt = datetime.utcfromtimestamp(int(time_str)/1000).replace(tzinfo=UTC) + return dt + + def get_start_row(self): + return self.get_int_arg(RequestParameters.START_ROW, 0) + + def get_row_count(self): + return self.get_int_arg(RequestParameters.ROW_COUNT, 10) + + def get_content_type(self): + return self.get_argument(RequestParameters.OUTPUT, "JSON") + + def get_apply_low_pass_filter(self, default=True): + return self.get_boolean_arg(RequestParameters.APPLY_LOW_PASS, default) + + def get_low_pass_low_cut(self, default=12): + return self.get_float_arg(RequestParameters.LOW_CUT, default) + + def get_low_pass_order(self, default=9): + return self.get_float_arg(RequestParameters.ORDER, default) + + def get_include_meta(self): + return self.get_boolean_arg("includemeta", True) + + def get_plot_series(self, default="mean"): + return self.get_argument(RequestParameters.PLOT_SERIES, default=default) + + def get_plot_type(self, default="default"): + return self.get_argument(RequestParameters.PLOT_TYPE, default=default) + + def get_spark_cfg(self, default=SparkConfig.DEFAULT): + arg = self.get_argument(RequestParameters.SPARK_CFG, default) + try: + master,nexecs,nparts = arg.split(',') + except: + raise ValueError('Invalid spark configuration: %s' % arg) + if master not in ("local", "yarn", "mesos"): + raise ValueError('Invalid spark master: %s' % master) + nexecs = int(nexecs) + if (nexecs < 1) or (nexecs > SparkConfig.MAX_NUM_EXECS): + raise ValueError('Invalid number of Spark executors: %d (must be between 1 and %d)' % (nexecs, SparkConfig.MAX_NUM_EXECS)) + nparts = int(nparts) + if (nparts < 1) or (nparts > SparkConfig.MAX_NUM_PARTS): + raise ValueError('Invalid number of Spark data partitions: %d (must be between 1 and %d)' % (nparts,SparkConfig.MAX_NUM_PARTS)) + if master == "local": + master = "local[%d]" % nexecs + return master,nexecs,nparts + +class NexusResults: + def __init__(self, results=None, meta=None, stats=None, computeOptions=None, status_code=200, **args): + self.status_code = status_code + self.__results = results + self.__meta = meta if meta is not None else {} + self.__stats = stats if stats is not None else {} + self.__computeOptions = computeOptions + if computeOptions is not None: + self.__minLat = computeOptions.get_min_lat() + self.__maxLat = computeOptions.get_max_lat() + self.__minLon = computeOptions.get_min_lon() + self.__maxLon = computeOptions.get_max_lon() + self.__ds = computeOptions.get_dataset() + self.__startTime = computeOptions.get_start_time() + self.__endTime = computeOptions.get_end_time() + else: + self.__minLat = args["minLat"] if "minLat" in args else -90.0 + self.__maxLat = args["maxLat"] if "maxLat" in args else 90.0 + self.__minLon = args["minLon"] if "minLon" in args else -180.0 + self.__maxLon = args["maxLon"] if "maxLon" in args else 180.0 + self.__ds = args["ds"] if "ds" in args else None + self.__startTime = args["startTime"] if "startTime" in args else None + self.__endTime = args["endTime"] if "endTime" in args else None + + self.extendMeta(minLat=self.__minLat, + maxLat=self.__maxLat, + minLon=self.__minLon, + maxLon=self.__maxLon, + ds=self.__ds, + startTime=self.__startTime, + endTime=self.__endTime) + + def computeOptions(self): + return self.__computeOptions + + def results(self): + return self.__results + + def meta(self): + return self.__meta + + def stats(self): + return self.__stats + + def _extendMeta(self, meta, minLat, maxLat, minLon, maxLon, ds, startTime, endTime): + if meta is None: + return None + + meta["shortName"] = ds + if "title" in meta and "units" in meta: + meta["label"] = "%s (%s)" % (meta["title"], meta["units"]) + meta["bounds"] = { + "east": maxLon, + "west": minLon, + "north": maxLat, + "south": minLat + } + meta["time"] = { + "start": startTime, + "stop": endTime + } + return meta + + def extendMeta(self, minLat, maxLat, minLon, maxLon, ds, startTime, endTime): + if self.__meta is None: + return None + if type(ds) == list: + for i in range(0, len(ds)): + shortName = ds[i] + + if type(self.__meta) == list: + subMeta = self.__meta[i] + else: + subMeta = self.__meta # Risky + self._extendMeta(subMeta, minLat, maxLat, minLon, maxLon, shortName, startTime, endTime) + else: + if type(self.__meta) == list: + self.__meta = self.__meta[0] + else: + self.__meta = self.__meta # Risky + self._extendMeta(self.__meta, minLat, maxLat, minLon, maxLon, ds, startTime, endTime) + + def toJson(self): + data = { + 'meta': self.__meta, + 'data': self.__results, + 'stats': self.__stats + } + return json.dumps(data, indent=4, cls=CustomEncoder) + + def toImage(self): + raise Exception("Not implemented for this result type") + + +class CustomEncoder(json.JSONEncoder): + def default(self, obj): + """If input object is an ndarray it will be converted into a dict + holding dtype, shape and the data, base64 encoded. + """ + numpy_types = ( + np.bool_, + # np.bytes_, -- python `bytes` class is not json serializable + # np.complex64, -- python `complex` class is not json serializable + # np.complex128, -- python `complex` class is not json serializable + # np.complex256, -- python `complex` class is not json serializable + # np.datetime64, -- python `datetime.datetime` class is not json serializable + np.float16, + np.float32, + np.float64, + # np.float128, -- special handling below + np.int8, + np.int16, + np.int32, + np.int64, + # np.object_ -- should already be evaluated as python native + np.str_, + np.uint8, + np.uint16, + np.uint32, + np.uint64, + np.void, + ) + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, numpy_types): + return obj.item() + elif isinstance(obj, np.float128): + return obj.astype(np.float64).item() + elif isinstance(obj, Decimal): + return str(obj) + elif isinstance(obj, datetime): + return str(obj) + elif obj is np.ma.masked: + return str(np.NaN) + # Let the base class default method raise the TypeError + return json.JSONEncoder.default(self, obj) + + +__CACHE = {} + +def cached(ttl=60000): + + def _hash_function_signature(func): + hash_object = hashlib.md5(str(inspect.getargspec(func)) + str(func)) + return hash_object.hexdigest() + + def _now(): + return int(round(time.time() * 1000)) + + def _expired(t): + if t is None or _now() - t > ttl: + return True + else: + return False + + def _cached_decorator(func): + + def func_wrapper(self, computeOptions, **args): + hash = _hash_function_signature(func) + force = computeOptions.get_boolean_arg("nocached", default=False) + + if force or hash not in __CACHE or (hash in __CACHE and _expired(__CACHE[hash]["time"])): + result = func(self, computeOptions, **args) + __CACHE[hash] = { + "time": _now(), + "result": result + } + + return __CACHE[hash]["result"] + return func_wrapper + + return _cached_decorator + + + + + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/README.md ---------------------------------------------------------------------- diff --git a/client/README.md b/client/README.md new file mode 100644 index 0000000..e69de29
