http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py 
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
new file mode 100644
index 0000000..6b2c6a1
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -0,0 +1,554 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import calendar
+import itertools
+import logging
+import traceback
+from cStringIO import StringIO
+from datetime import datetime
+
+import matplotlib.dates as mdates
+import matplotlib.pyplot as plt
+import numpy as np
+import pytz
+import shapely.geometry
+import shapely.wkt
+from backports.functools_lru_cache import lru_cache
+from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
+from scipy import stats
+
+from webservice import Filtering as filtering
+from webservice.NexusHandler import nexus_handler, SparkHandler
+from webservice.webmodel import NexusResults, NoDataException, 
NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class TimeSeriesHandlerImpl(SparkHandler):
+    name = "Time Series Spark"
+    path = "/timeSeriesSpark"
+    description = "Computes a time series plot between one or more datasets 
given an arbitrary geographical area and time range"
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "comma-delimited string",
+            "description": "The dataset(s) Used to generate the Time Series. 
Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude. Required"
+        },
+        "seasonalFilter": {
+            "name": "Compute Seasonal Cycle Filter",
+            "type": "boolean",
+            "description": "Flag used to specify if the seasonal averages 
should be computed during "
+                           "Time Series computation. Optional (Default: False)"
+        },
+        "lowPassFilter": {
+            "name": "Compute Low Pass Filter",
+            "type": "boolean",
+            "description": "Flag used to specify if a low pass filter should 
be computed during "
+                           "Time Series computation. Optional (Default: True)"
+        },
+        "spark": {
+            "name": "Spark Configuration",
+            "type": "comma-delimited value",
+            "description": "Configuration used to launch in the Spark cluster. 
Value should be 3 elements separated by "
+                           "commas. 1) Spark Master 2) Number of Spark 
Executors 3) Number of Spark Partitions. Only "
+                           "Number of Spark Partitions is used by this 
function. Optional (Default: local,1,1)"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        SparkHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+
+        try:
+            ds = request.get_dataset()
+            if type(ds) != list and type(ds) != tuple:
+                ds = (ds,)
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be comma-delimited 
string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(reason="Cannot compute time series 
on a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), 
request.get_min_lat(), \
+                                           request.get_max_lon(), 
request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, 
north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited 
float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. 
Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        apply_seasonal_cycle_filter = 
request.get_apply_seasonal_cycle_filter(default=False)
+        apply_low_pass_filter = request.get_apply_low_pass_filter()
+
+        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
+               apply_seasonal_cycle_filter, apply_low_pass_filter, 
spark_master, spark_nexecs, spark_nparts
+
+    def calc(self, request, **args):
+        """
+    
+        :param request: StatsComputeOptions
+        :param args: dict
+        :return:
+        """
+
+        ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
+        apply_seasonal_cycle_filter, apply_low_pass_filter, spark_master, \
+        spark_nexecs, spark_nparts = self.parse_arguments(request)
+
+        resultsRaw = []
+
+        for shortName in ds:
+
+            the_time = datetime.now()
+            daysinrange = 
self._tile_service.find_days_in_range_asc(bounding_polygon.bounds[1],
+                                                                    
bounding_polygon.bounds[3],
+                                                                    
bounding_polygon.bounds[0],
+                                                                    
bounding_polygon.bounds[2],
+                                                                    shortName,
+                                                                    
start_seconds_from_epoch,
+                                                                    
end_seconds_from_epoch)
+            self.log.info("Finding days in range took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
+
+            ndays = len(daysinrange)
+            if ndays == 0:
+                raise NoDataException(reason="No data found for selected 
timeframe")
+
+            self.log.debug('Found {0} days in range'.format(ndays))
+            for i, d in enumerate(daysinrange):
+                self.log.debug('{0}, {1}'.format(i, 
datetime.utcfromtimestamp(d)))
+            spark_nparts_needed = min(spark_nparts, ndays)
+
+            the_time = datetime.now()
+            results, meta = spark_driver(daysinrange, bounding_polygon, 
shortName,
+                                         
spark_nparts_needed=spark_nparts_needed, sc=self._sc)
+            self.log.info("Time series calculation took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
+
+            if apply_seasonal_cycle_filter:
+                the_time = datetime.now()
+                for result in results:
+                    month = datetime.utcfromtimestamp(result['time']).month
+                    month_mean, month_max, month_min = 
self.calculate_monthly_average(month, bounding_polygon.wkt,
+                                                                               
       shortName)
+                    seasonal_mean = result['mean'] - month_mean
+                    seasonal_min = result['min'] - month_min
+                    seasonal_max = result['max'] - month_max
+                    result['meanSeasonal'] = seasonal_mean
+                    result['minSeasonal'] = seasonal_min
+                    result['maxSeasonal'] = seasonal_max
+                self.log.info(
+                    "Seasonal calculation took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
+
+            the_time = datetime.now()
+            filtering.applyAllFiltersOnField(results, 'mean', 
applySeasonal=False, applyLowPass=apply_low_pass_filter)
+            filtering.applyAllFiltersOnField(results, 'max', 
applySeasonal=False, applyLowPass=apply_low_pass_filter)
+            filtering.applyAllFiltersOnField(results, 'min', 
applySeasonal=False, applyLowPass=apply_low_pass_filter)
+
+            if apply_seasonal_cycle_filter and apply_low_pass_filter:
+                try:
+                    filtering.applyFiltersOnField(results, 'meanSeasonal', 
applySeasonal=False, applyLowPass=True,
+                                                  append="LowPass")
+                    filtering.applyFiltersOnField(results, 'minSeasonal', 
applySeasonal=False, applyLowPass=True,
+                                                  append="LowPass")
+                    filtering.applyFiltersOnField(results, 'maxSeasonal', 
applySeasonal=False, applyLowPass=True,
+                                                  append="LowPass")
+                except Exception as e:
+                    # If it doesn't work log the error but ignore it
+                    tb = traceback.format_exc()
+                    self.log.warn("Error calculating SeasonalLowPass 
filter:\n%s" % tb)
+
+            resultsRaw.append([results, meta])
+            self.log.info(
+                "LowPass filter calculation took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
+
+            the_time = datetime.now()
+            self._create_nc_file_time1d(np.array(results), 'ts.nc', 'mean',
+                                        fill=-9999.)
+            self.log.info(
+                "NetCDF generation took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
+
+        the_time = datetime.now()
+        results = self._mergeResults(resultsRaw)
+
+        if len(ds) == 2:
+            try:
+                stats = 
TimeSeriesHandlerImpl.calculate_comparison_stats(results)
+            except Exception:
+                stats = {}
+                tb = traceback.format_exc()
+                self.log.warn("Error when calculating comparison stats:\n%s" % 
tb)
+        else:
+            stats = {}
+
+        meta = []
+        for singleRes in resultsRaw:
+            meta.append(singleRes[1])
+
+        res = TimeSeriesResults(results=results, meta=meta, stats=stats,
+                                computeOptions=None, 
minLat=bounding_polygon.bounds[1],
+                                maxLat=bounding_polygon.bounds[3], 
minLon=bounding_polygon.bounds[0],
+                                maxLon=bounding_polygon.bounds[2], ds=ds, 
startTime=start_seconds_from_epoch,
+                                endTime=end_seconds_from_epoch)
+
+        self.log.info("Merging results and calculating comparisons took %s" % 
(str(datetime.now() - the_time)))
+        return res
+
+    @lru_cache()
+    def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, 
ds=None):
+
+        min_date, max_date = self.get_min_max_date(ds=ds)
+
+        monthly_averages, monthly_counts = [], []
+        monthly_mins, monthly_maxes = [], []
+        bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt)
+        for year in range(min_date.year, max_date.year + 1):
+            beginning_of_month = datetime(year, month, 1)
+            end_of_month = datetime(year, month, calendar.monthrange(year, 
month)[1], 23, 59, 59)
+            start = (pytz.UTC.localize(beginning_of_month) - 
EPOCH).total_seconds()
+            end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds()
+            tile_stats = 
self._tile_service.find_tiles_in_polygon(bounding_polygon, ds, start, end,
+                                                                  fl=('id,'
+                                                                      
'tile_avg_val_d,tile_count_i,'
+                                                                      
'tile_min_val_d,tile_max_val_d,'
+                                                                      
'tile_min_lat,tile_max_lat,'
+                                                                      
'tile_min_lon,tile_max_lon'),
+                                                                  
fetch_data=False)
+            if len(tile_stats) == 0:
+                continue
+
+            # Split list into tiles on the border of the bounding box and 
tiles completely inside the bounding box.
+            border_tiles, inner_tiles = [], []
+            for tile in tile_stats:
+                inner_tiles.append(tile) if 
bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon,
+                                                                               
            tile.bbox.min_lat,
+                                                                               
            tile.bbox.max_lon,
+                                                                               
            tile.bbox.max_lat)) else border_tiles.append(
+                    tile)
+
+            # We can use the stats of the inner tiles directly
+            tile_means = [tile.tile_stats.mean for tile in inner_tiles]
+            tile_mins = [tile.tile_stats.min for tile in inner_tiles]
+            tile_maxes = [tile.tile_stats.max for tile in inner_tiles]
+            tile_counts = [tile.tile_stats.count for tile in inner_tiles]
+
+            # Border tiles need have the data loaded, masked, and stats 
recalculated
+            border_tiles = 
list(self._tile_service.fetch_data_for_tiles(*border_tiles))
+            border_tiles = 
self._tile_service.mask_tiles_to_polygon(bounding_polygon, border_tiles)
+            for tile in border_tiles:
+                tile.update_stats()
+                tile_means.append(tile.tile_stats.mean)
+                tile_mins.append(tile.tile_stats.min)
+                tile_maxes.append(tile.tile_stats.max)
+                tile_counts.append(tile.tile_stats.count)
+
+            tile_means = np.array(tile_means)
+            tile_mins = np.array(tile_mins)
+            tile_maxes = np.array(tile_maxes)
+            tile_counts = np.array(tile_counts)
+
+            sum_tile_counts = np.sum(tile_counts) * 1.0
+
+            monthly_averages += [np.average(tile_means, None, tile_counts / 
sum_tile_counts).item()]
+            monthly_mins += [np.average(tile_mins, None, tile_counts / 
sum_tile_counts).item()]
+            monthly_maxes += [np.average(tile_maxes, None, tile_counts / 
sum_tile_counts).item()]
+            monthly_counts += [sum_tile_counts]
+
+        count_sum = np.sum(monthly_counts) * 1.0
+        weights = np.array(monthly_counts) / count_sum
+
+        return np.average(monthly_averages, None, weights).item(), \
+               np.average(monthly_averages, None, weights).item(), \
+               np.average(monthly_averages, None, weights).item()
+
+    @lru_cache()
+    def get_min_max_date(self, ds=None):
+        min_date = pytz.timezone('UTC').localize(
+            datetime.utcfromtimestamp(self._tile_service.get_min_time([], 
ds=ds)))
+        max_date = pytz.timezone('UTC').localize(
+            datetime.utcfromtimestamp(self._tile_service.get_max_time([], 
ds=ds)))
+
+        return min_date.date(), max_date.date()
+
+    @staticmethod
+    def calculate_comparison_stats(results):
+        xy = [[], []]
+
+        for item in results:
+            if len(item) == 2:
+                xy[item[0]["ds"]].append(item[0]["mean"])
+                xy[item[1]["ds"]].append(item[1]["mean"])
+
+        slope, intercept, r_value, p_value, std_err = stats.linregress(xy[0], 
xy[1])
+        comparisonStats = {
+            "slope": slope,
+            "intercept": intercept,
+            "r": r_value,
+            "p": p_value,
+            "err": std_err
+        }
+
+        return comparisonStats
+
+
+class TimeSeriesResults(NexusResults):
+    LINE_PLOT = "line"
+    SCATTER_PLOT = "scatter"
+
+    __SERIES_COLORS = ['red', 'blue']
+
+    def toImage(self):
+
+        type = self.computeOptions().get_plot_type()
+
+        if type == TimeSeriesResults.LINE_PLOT or type == "default":
+            return self.createLinePlot()
+        elif type == TimeSeriesResults.SCATTER_PLOT:
+            return self.createScatterPlot()
+        else:
+            raise Exception("Invalid or unsupported time series plot 
specified")
+
+    def createScatterPlot(self):
+        timeSeries = []
+        series0 = []
+        series1 = []
+
+        res = self.results()
+        meta = self.meta()
+
+        plotSeries = self.computeOptions().get_plot_series() if 
self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "mean"
+
+        for m in res:
+            if len(m) == 2:
+                timeSeries.append(datetime.fromtimestamp(m[0]["time"] / 1000))
+                series0.append(m[0][plotSeries])
+                series1.append(m[1][plotSeries])
+
+        title = ', '.join(set([m['title'] for m in meta]))
+        sources = ', '.join(set([m['source'] for m in meta]))
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), 
timeSeries[-1].strftime('%b %Y'))
+
+        fig, ax = plt.subplots()
+        fig.set_size_inches(11.0, 8.5)
+        ax.scatter(series0, series1, alpha=0.5)
+        ax.set_xlabel(meta[0]['units'])
+        ax.set_ylabel(meta[1]['units'])
+        ax.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+
+        par = np.polyfit(series0, series1, 1, full=True)
+        slope = par[0][0]
+        intercept = par[0][1]
+        xl = [min(series0), max(series0)]
+        yl = [slope * xx + intercept for xx in xl]
+        plt.plot(xl, yl, '-r')
+
+        # r = self.stats()["r"]
+        # plt.text(0.5, 0.5, "r = foo")
+
+        ax.grid(True)
+        fig.tight_layout()
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+    def createLinePlot(self):
+        nseries = len(self.meta())
+        res = self.results()
+        meta = self.meta()
+
+        timeSeries = [datetime.fromtimestamp(m[0]["time"] / 1000) for m in res]
+
+        means = [[np.nan] * len(res) for n in range(0, nseries)]
+
+        plotSeries = self.computeOptions().get_plot_series() if 
self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "mean"
+
+        for n in range(0, len(res)):
+            timeSlot = res[n]
+            for seriesValues in timeSlot:
+                means[seriesValues['ds']][n] = seriesValues[plotSeries]
+
+        x = timeSeries
+
+        fig, axMain = plt.subplots()
+        fig.set_size_inches(11.0, 8.5)
+        fig.autofmt_xdate()
+
+        title = ', '.join(set([m['title'] for m in meta]))
+        sources = ', '.join(set([m['source'] for m in meta]))
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), 
timeSeries[-1].strftime('%b %Y'))
+
+        axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+        axMain.set_xlabel('Date')
+        axMain.grid(True)
+        axMain.xaxis.set_major_locator(mdates.YearLocator())
+        axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y'))
+        axMain.xaxis.set_minor_locator(mdates.MonthLocator())
+        axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d')
+
+        plots = []
+
+        for n in range(0, nseries):
+            if n == 0:
+                ax = axMain
+            else:
+                ax = ax.twinx()
+
+            plots += ax.plot(x, means[n], color=self.__SERIES_COLORS[n], 
zorder=10, linewidth=3,
+                             label=meta[n]['title'])
+            ax.set_ylabel(meta[n]['units'])
+
+        labs = [l.get_label() for l in plots]
+        axMain.legend(plots, labs, loc=0)
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+
+def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999., 
spark_nparts_needed=1, sc=None):
+    nexus_tiles_spark = [(bounding_polygon.wkt, ds,
+                          list(daysinrange_part), fill)
+                         for daysinrange_part
+                         in np.array_split(daysinrange,
+                                           spark_nparts_needed)]
+
+    # Launch Spark computations
+    rdd = sc.parallelize(nexus_tiles_spark, spark_nparts_needed)
+    results = rdd.map(calc_average_on_day).collect()
+    results = list(itertools.chain.from_iterable(results))
+    results = sorted(results, key=lambda entry: entry["time"])
+
+    return results, {}
+
+
+def calc_average_on_day(tile_in_spark):
+    import shapely.wkt
+    (bounding_wkt, dataset, timestamps, fill) = tile_in_spark
+    if len(timestamps) == 0:
+        return []
+    tile_service = NexusTileService()
+    ds1_nexus_tiles = \
+        
tile_service.get_tiles_bounded_by_polygon(shapely.wkt.loads(bounding_wkt),
+                                                  dataset,
+                                                  timestamps[0],
+                                                  timestamps[-1],
+                                                  rows=5000)
+
+    tile_dict = {}
+    for timeinseconds in timestamps:
+        tile_dict[timeinseconds] = []
+
+    for i in range(len(ds1_nexus_tiles)):
+        tile = ds1_nexus_tiles[i]
+        tile_dict[tile.times[0]].append(i)
+
+    stats_arr = []
+    for timeinseconds in timestamps:
+        cur_tile_list = tile_dict[timeinseconds]
+        if len(cur_tile_list) == 0:
+            continue
+        tile_data_agg = \
+            np.ma.array(data=np.hstack([ds1_nexus_tiles[i].data.data.flatten()
+                                        for i in cur_tile_list
+                                        if (ds1_nexus_tiles[i].times[0] ==
+                                            timeinseconds)]),
+                        mask=np.hstack([ds1_nexus_tiles[i].data.mask.flatten()
+                                        for i in cur_tile_list
+                                        if (ds1_nexus_tiles[i].times[0] ==
+                                            timeinseconds)]))
+        lats_agg = np.hstack([np.repeat(ds1_nexus_tiles[i].latitudes,
+                                        len(ds1_nexus_tiles[i].longitudes))
+                              for i in cur_tile_list
+                              if (ds1_nexus_tiles[i].times[0] ==
+                                  timeinseconds)])
+        if (len(tile_data_agg) == 0) or tile_data_agg.mask.all():
+            continue
+        else:
+            data_min = np.ma.min(tile_data_agg)
+            data_max = np.ma.max(tile_data_agg)
+            daily_mean = \
+                np.ma.average(tile_data_agg,
+                              weights=np.cos(np.radians(lats_agg))).item()
+            data_count = np.ma.count(tile_data_agg)
+            data_std = np.ma.std(tile_data_agg)
+
+        # Return Stats by day
+        stat = {
+            'min': data_min,
+            'max': data_max,
+            'mean': daily_mean,
+            'cnt': data_count,
+            'std': data_std,
+            'time': int(timeinseconds)
+        }
+        stats_arr.append(stat)
+    return stats_arr

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/__init__.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/__init__.py 
b/analysis/webservice/algorithms_spark/__init__.py
new file mode 100644
index 0000000..51727dc
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/__init__.py
@@ -0,0 +1,58 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import os
+import logging
+
+log = logging.getLogger(__name__)
+
+
+def module_exists(module_name):
+    try:
+        __import__(module_name)
+    except ImportError:
+        return False
+    else:
+        return True
+
+
+if module_exists("pyspark"):
+    try:
+        import CorrMapSpark
+    except ImportError:
+        pass
+
+    try:
+        import Matchup
+    except ImportError:
+        pass
+
+    try:
+        import TimeAvgMapSpark
+    except ImportError:
+        pass
+
+    try:
+        import TimeSeriesSpark
+    except ImportError:
+        pass
+
+    try:
+        import ClimMapSpark
+    except ImportError:
+        pass
+
+    try:
+        import DailyDifferenceAverageSpark
+    except ImportError:
+        pass
+
+    try:
+        import HofMoellerSpark
+    except ImportError:
+        pass
+
+
+else:
+    log.warn("pyspark not found. Skipping algorithms in %s" % 
os.path.dirname(__file__))

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/config/algorithms.ini
----------------------------------------------------------------------
diff --git a/analysis/webservice/config/algorithms.ini 
b/analysis/webservice/config/algorithms.ini
new file mode 100644
index 0000000..fae7ae5
--- /dev/null
+++ b/analysis/webservice/config/algorithms.ini
@@ -0,0 +1,5 @@
+[multiprocessing]
+maxprocesses=8
+
+[spark]
+maxconcurrentjobs=10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/config/web.ini
----------------------------------------------------------------------
diff --git a/analysis/webservice/config/web.ini 
b/analysis/webservice/config/web.ini
new file mode 100644
index 0000000..ed9bbb7
--- /dev/null
+++ b/analysis/webservice/config/web.ini
@@ -0,0 +1,11 @@
+[global]
+server.socket_port=8083
+server.socket_host = '127.0.0.1'
+server.max_simultaneous_requests = 10
+
+[static]
+static_enabled=true
+static_dir=static
+
+[modules]
+module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/matserver.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/matserver.py b/analysis/webservice/matserver.py
new file mode 100644
index 0000000..5cdd8a2
--- /dev/null
+++ b/analysis/webservice/matserver.py
@@ -0,0 +1,30 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import matplotlib.pyplot as plt
+import numpy as np
+
+import mpld3
+from mpld3 import plugins
+
+fig, ax = plt.subplots()
+
+x = np.linspace(-2, 2, 20)
+y = x[:, None]
+X = np.zeros((20, 20, 4))
+
+X[:, :, 0] = np.exp(- (x - 1) ** 2 - (y) ** 2)
+X[:, :, 1] = np.exp(- (x + 0.71) ** 2 - (y - 0.71) ** 2)
+X[:, :, 2] = np.exp(- (x + 0.71) ** 2 - (y + 0.71) ** 2)
+X[:, :, 3] = np.exp(-0.25 * (x ** 2 + y ** 2))
+
+im = ax.imshow(X, extent=(10, 20, 10, 20),
+               origin='lower', zorder=1, interpolation='nearest')
+fig.colorbar(im, ax=ax)
+
+ax.set_title('An Image', size=20)
+
+plugins.connect(fig, plugins.MousePosition(fontsize=14))
+
+mpld3.show()

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/plotting.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/plotting.py b/analysis/webservice/plotting.py
new file mode 100644
index 0000000..041c597
--- /dev/null
+++ b/analysis/webservice/plotting.py
@@ -0,0 +1,552 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import pyximport;
+
+pyximport.install()
+
+import os
+import numpy as np
+import matplotlib.pyplot as plt
+import ConfigParser
+import matplotlib.dates as mdates
+from matplotlib import cm
+import pickle
+from cStringIO import StringIO
+import datetime
+from matplotlib.ticker import FuncFormatter
+from matplotlib.colors import LightSource
+from mpl_toolkits.basemap import Basemap
+import mpld3
+
+
+# TODO needs to be updated to use nexustiles
+
+def __writeAndShowImageData(data, delete=True):
+    f = open("foo.png", "wb")
+    f.write(data)
+    f.close()
+
+    plt.clf()
+    image = plt.imread("foo.png")
+    plt.imshow(image)
+    plt.axis('off')  # clear x- and y-axes
+    plt.show()
+    if delete:
+        os.unlink("foo.png")
+
+
+def createLatLonTimeAverageMapGlobe(res, meta, startTime=None, endTime=None):
+    latSeries = [m[0]['lat'] for m in res]
+    lonSeries = [m['lon'] for m in res[0]][:]
+    data = np.zeros((len(lonSeries), len(latSeries)))
+    for t in range(0, len(latSeries)):
+        latSet = res[t]
+        for l in range(0, len(lonSeries)):
+            data[l][t] = latSet[l]['avg']
+    data[data == 0.0] = np.nan
+    # data = np.rot90(data, 3)
+    lats, lons = np.meshgrid(latSeries, lonSeries)
+    masked_array = np.ma.array(data, mask=np.isnan(data))
+    z = masked_array
+
+    fig = plt.figure()
+    fig.set_size_inches(11.0, 8.5)
+    ax = fig.add_axes([0.05, 0.05, 0.9, 0.9])
+
+    m = Basemap(projection='ortho', lat_0=20, lon_0=-100, resolution='l')
+    # m.drawmapboundary(fill_color='0.3')
+    im1 = m.pcolormesh(lons, lats, z, shading='flat', cmap=plt.cm.jet, 
latlon=True)
+    m.drawparallels(np.arange(-90., 99., 30.))
+    m.drawmeridians(np.arange(-180., 180., 60.))
+    # m.drawcoastlines()
+    # m.drawcountries()
+    cb = m.colorbar(im1, "bottom", size="5%", pad="2%")
+
+    title = meta['title']
+    source = meta['source']
+    if startTime is not None and endTime is not None:
+        if type(startTime) is not datetime.datetime:
+            startTime = datetime.datetime.fromtimestamp(startTime / 1000)
+        if type(endTime) is not datetime.datetime:
+            endTime = datetime.datetime.fromtimestamp(endTime / 1000)
+        dateRange = "%s - %s" % (startTime.strftime('%b %Y'), 
endTime.strftime('%b %Y'))
+    else:
+        dateRange = ""
+
+    ax.set_title("%s\n%s\n%s" % (title, source, dateRange))
+    ax.set_ylabel('Latitude')
+    ax.set_xlabel('Longitude')
+
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    return sio.getvalue()
+
+
+def createLatLonTimeAverageMapProjected(res, meta, startTime=None, 
endTime=None):
+    latSeries = [m[0]['lat'] for m in res]
+    lonSeries = [m['lon'] for m in res[0]][:]
+    data = np.zeros((len(lonSeries), len(latSeries)))
+    for t in range(0, len(latSeries)):
+        latSet = res[t]
+        for l in range(0, len(lonSeries)):
+            data[l][t] = latSet[l]['avg']
+    data[data == 0.0] = np.nan
+    # data = np.rot90(data, 3)
+    lats, lons = np.meshgrid(latSeries, lonSeries)
+    masked_array = np.ma.array(data, mask=np.isnan(data))
+    z = masked_array
+
+    fig = plt.figure()
+    fig.set_size_inches(11.0, 8.5)
+    ax = fig.add_axes([0.05, 0.05, 0.9, 0.9])
+
+    m = Basemap(projection='kav7', lon_0=0, resolution=None)
+    # m.drawmapboundary(fill_color='0.3')
+    im1 = m.pcolormesh(lons, lats, z, shading='gouraud', cmap=plt.cm.jet, 
latlon=True)
+    m.drawparallels(np.arange(-90., 99., 30.))
+    m.drawmeridians(np.arange(-180., 180., 60.))
+    # m.drawcoastlines()
+    # m.drawcountries()
+    cb = m.colorbar(im1, "bottom", size="5%", pad="2%")
+
+    title = meta['title']
+    source = meta['source']
+    if startTime is not None and endTime is not None:
+        if type(startTime) is not datetime.datetime:
+            startTime = datetime.datetime.fromtimestamp(startTime / 1000)
+        if type(endTime) is not datetime.datetime:
+            endTime = datetime.datetime.fromtimestamp(endTime / 1000)
+        dateRange = "%s - %s" % (startTime.strftime('%b %Y'), 
endTime.strftime('%b %Y'))
+    else:
+        dateRange = ""
+
+    ax.set_title("%s\n%s\n%s" % (title, source, dateRange))
+    ax.set_ylabel('Latitude')
+    ax.set_xlabel('Longitude')
+
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    return sio.getvalue()
+
+
+def createLatLonTimeAverageMap3d(res, meta, startTime=None, endTime=None):
+    latSeries = [m[0]['lat'] for m in res][::-1]
+    lonSeries = [m['lon'] for m in res[0]]
+    data = np.zeros((len(latSeries), len(lonSeries)))
+    for t in range(0, len(latSeries)):
+        latSet = res[t]
+        for l in range(0, len(lonSeries)):
+            data[len(latSeries) - t - 1][l] = latSet[l]['avg']
+    data[data == 0.0] = np.nan
+
+    x, y = np.meshgrid(latSeries, lonSeries)
+    z = data
+
+    region = np.s_[0:178, 0:178]
+    x, y, z = x[region], y[region], z[region]
+
+    fig, ax = plt.subplots(subplot_kw=dict(projection='3d'))
+
+    ls = LightSource(270, 45)
+    masked_array = np.ma.array(z, mask=np.isnan(z))
+    rgb = ls.shade(masked_array, cmap=cm.gist_earth)  # , vert_exag=0.1, 
blend_mode='soft')
+    surf = ax.plot_surface(x, y, masked_array, rstride=1, cstride=1, 
facecolors=rgb,
+                           linewidth=0, antialiased=False, shade=False)
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    return sio.getvalue()
+
+
+def createLatLonTimeAverageMap(res, meta, startTime=None, endTime=None):
+    latSeries = [m[0]['lat'] for m in res][::-1]
+    lonSeries = [m['lon'] for m in res[0]]
+
+    data = np.zeros((len(latSeries), len(lonSeries)))
+
+    for t in range(0, len(latSeries)):
+        latSet = res[t]
+        for l in range(0, len(lonSeries)):
+            data[len(latSeries) - t - 1][l] = latSet[l]['avg']
+
+    def yFormatter(y, pos):
+        if y < len(latSeries):
+            return '%s $^\circ$' % (int(latSeries[int(y)] * 100.0) / 100.)
+        else:
+            return ""
+
+    def xFormatter(x, pos):
+        if x < len(lonSeries):
+            return "%s $^\circ$" % (int(lonSeries[int(x)] * 100.0) / 100.)
+        else:
+            return ""
+
+    data[data == 0.0] = np.nan
+    fig, ax = plt.subplots()
+    fig.set_size_inches(11.0, 8.5)
+
+    cmap = cm.coolwarm
+    ls = LightSource(315, 45)
+    masked_array = np.ma.array(data, mask=np.isnan(data))
+    rgb = ls.shade(masked_array, cmap)
+
+    cax = ax.imshow(rgb, interpolation='nearest', cmap=cmap)
+
+    ax.yaxis.set_major_formatter(FuncFormatter(yFormatter))
+    ax.xaxis.set_major_formatter(FuncFormatter(xFormatter))
+
+    title = meta['title']
+    source = meta['source']
+    if startTime is not None and endTime is not None:
+        if type(startTime) is not datetime.datetime:
+            startTime = datetime.datetime.fromtimestamp(startTime / 1000)
+        if type(endTime) is not datetime.datetime:
+            endTime = datetime.datetime.fromtimestamp(endTime / 1000)
+        dateRange = "%s - %s" % (startTime.strftime('%b %Y'), 
endTime.strftime('%b %Y'))
+    else:
+        dateRange = ""
+
+    ax.set_title("%s\n%s\n%s" % (title, source, dateRange))
+    ax.set_ylabel('Latitude')
+    ax.set_xlabel('Longitude')
+
+    fig.colorbar(cax)
+    fig.autofmt_xdate()
+
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    return sio.getvalue()
+
+
+def __createLatLonTimeAverageMapTest(ds, minLat, minLon, maxLat, maxLon, 
startTime, endTime, mask, create=False,
+                                     delete=True, projection='flat'):
+    if create:
+        config = ConfigParser.RawConfigParser()
+        config.read("sci.ini")
+
+        sci = Sci(config)
+
+        res, meta = sci.getLongitudeLatitudeMapStatsForBox(minLat,
+                                                           maxLat,
+                                                           minLon,
+                                                           maxLon,
+                                                           ds,
+                                                           startTime,
+                                                           endTime,
+                                                           mask)
+
+        output = open('data_latlon_time_avg.pkl', 'wb')
+        pickle.dump((res, meta), output)
+        output.close()
+
+    pkl_file = open('data_latlon_time_avg.pkl', 'rb')
+    data1 = pickle.load(pkl_file)
+    pkl_file.close()
+    res, meta = data1
+    if projection == 'flat':
+        bytes = createLatLonTimeAverageMap(res, meta, startTime, endTime)
+    elif projection == '3d':
+        bytes = createLatLonTimeAverageMap3d(res, meta, startTime, endTime)
+    elif projection == 'projected':
+        bytes = createLatLonTimeAverageMapProjected(res, meta, startTime, 
endTime)
+    elif projection == 'globe':
+        bytes = createLatLonTimeAverageMapGlobe(res, meta, startTime, endTime)
+    __writeAndShowImageData(bytes, delete=delete)
+
+
+def createHoffmueller(data, coordSeries, timeSeries, coordName, title, 
interpolate='nearest'):
+    cmap = cm.coolwarm
+    # ls = LightSource(315, 45)
+    # rgb = ls.shade(data, cmap)
+
+    fig, ax = plt.subplots()
+    fig.set_size_inches(11.0, 8.5)
+    cax = ax.imshow(data, interpolation=interpolate, cmap=cmap)
+
+    def yFormatter(y, pos):
+        if y < len(coordSeries):
+            return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.)
+        else:
+            return ""
+
+    def xFormatter(x, pos):
+        if x < len(timeSeries):
+            return timeSeries[int(x)].strftime('%b %Y')
+        else:
+            return ""
+
+    ax.xaxis.set_major_formatter(FuncFormatter(xFormatter))
+    ax.yaxis.set_major_formatter(FuncFormatter(yFormatter))
+
+    ax.set_title(title)
+    ax.set_ylabel(coordName)
+    ax.set_xlabel('Date')
+
+    fig.colorbar(cax)
+    fig.autofmt_xdate()
+
+    labels = ['point {0}'.format(i + 1) for i in range(len(data))]
+    # plugins.connect(fig, plugins.MousePosition(fontsize=14))
+    tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels)
+    mpld3.plugins.connect(fig, tooltip)
+    mpld3.show()
+    # sio = StringIO()
+    # plt.savefig(sio, format='png')
+    # return sio.getvalue()
+
+
+def createLongitudeHoffmueller(res, meta):
+    lonSeries = [m['longitude'] for m in res[0]['lons']]
+    timeSeries = [datetime.datetime.fromtimestamp(m['time'] / 1000) for m in 
res]
+
+    data = np.zeros((len(lonSeries), len(timeSeries)))
+
+    for t in range(0, len(timeSeries)):
+        timeSet = res[t]
+        for l in range(0, len(lonSeries)):
+            latSet = timeSet['lons'][l]
+            value = latSet['avg']
+            data[len(lonSeries) - l - 1][t] = value
+
+    title = meta['title']
+    source = meta['source']
+    dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), 
timeSeries[-1].strftime('%b %Y'))
+
+    return createHoffmueller(data, lonSeries, timeSeries, "Longitude", 
"%s\n%s\n%s" % (title, source, dateRange),
+                             interpolate='nearest')
+
+
+def createLongitudeHoffmuellerTest(ds, minLat, minLon, maxLat, maxLon, 
startTime, endTime, mask, create=False,
+                                   delete=True):
+    if create:
+        config = ConfigParser.RawConfigParser()
+        config.read("sci.ini")
+
+        sci = Sci(config)
+
+        res, meta = sci.getLongitudeTimeHofMoellerStatsForBox(minLat,
+                                                              maxLat,
+                                                              minLon,
+                                                              maxLon,
+                                                              ds,
+                                                              startTime,
+                                                              endTime,
+                                                              mask)
+
+        output = open('data_lon_hoff.pkl', 'wb')
+        pickle.dump((res, meta), output)
+        output.close()
+
+    pkl_file = open('data_lon_hoff.pkl', 'rb')
+    data1 = pickle.load(pkl_file)
+    pkl_file.close()
+    res, meta = data1
+    bytes = createLongitudeHoffmueller(res, meta)
+    __writeAndShowImageData(bytes, delete=delete)
+
+
+def createLatitudeHoffmueller(res, meta):
+    latSeries = [m['latitude'] for m in res[0]['lats']]
+    timeSeries = [datetime.datetime.fromtimestamp(m['time'] / 1000) for m in 
res]
+
+    data = np.zeros((len(latSeries), len(timeSeries)))
+
+    for t in range(0, len(timeSeries)):
+        timeSet = res[t]
+        for l in range(0, len(latSeries)):
+            latSet = timeSet['lats'][l]
+            value = latSet['avg']
+            data[len(latSeries) - l - 1][t] = value
+
+    title = meta['title']
+    source = meta['source']
+    dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), 
timeSeries[-1].strftime('%b %Y'))
+
+    return createHoffmueller(data, latSeries, timeSeries, "Latitude", 
title="%s\n%s\n%s" % (title, source, dateRange),
+                             interpolate='nearest')
+
+
+def createLatitudeHoffmuellerTest(ds, minLat, minLon, maxLat, maxLon, 
startTime, endTime, mask, create=False,
+                                  delete=True):
+    if create:
+        config = ConfigParser.RawConfigParser()
+        config.read("sci.ini")
+
+        sci = Sci(config)
+
+        res, meta = sci.getLatitudeTimeHofMoellerStatsForBox(minLat,
+                                                             maxLat,
+                                                             minLon,
+                                                             maxLon,
+                                                             ds,
+                                                             startTime,
+                                                             endTime,
+                                                             mask)
+
+        output = open('data_lat_hoff.pkl', 'wb')
+        pickle.dump((res, meta), output)
+        output.close()
+
+    pkl_file = open('data_lat_hoff.pkl', 'rb')
+    data1 = pickle.load(pkl_file)
+    pkl_file.close()
+    res, meta = data1
+    bytes = createLatitudeHoffmueller(res, meta)
+    # __writeAndShowImageData(bytes, delete=delete)
+
+
+SERIES_COLORS = ['red', 'blue']
+
+
+def createTimeSeries(res, meta, nseries=1):
+    # maxSeries = [m[0]['maxFiltered'] for m in res]
+    # minSeries = [m[0]['minFiltered'] for m in res]
+    # mean = [m[0]["meanFiltered"] for m in res]
+    # mean1 = [m[1]["meanFiltered"] for m in res]
+    # stdSeries = [m[0]['std'] for m in res]
+
+    timeSeries = [datetime.datetime.fromtimestamp(m[0]["time"] / 1000) for m 
in res]
+
+    means = [[np.nan] * len(res) for n in range(0, nseries)]
+
+    for n in range(0, len(res)):
+        timeSlot = res[n]
+        for seriesValues in timeSlot:
+            means[seriesValues['ds']][n] = seriesValues['mean']
+
+    x = timeSeries
+
+    fig, axMain = plt.subplots()
+    fig.set_size_inches(11.0, 8.5)
+    fig.autofmt_xdate()
+
+    title = ', '.join(set([m['title'] for m in meta]))
+    sources = ', '.join(set([m['source'] for m in meta]))
+    dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), 
timeSeries[-1].strftime('%b %Y'))
+
+    axMain.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+    axMain.set_xlabel('Date')
+    axMain.grid(True)
+    axMain.xaxis.set_major_locator(mdates.YearLocator())
+    axMain.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y'))
+    axMain.xaxis.set_minor_locator(mdates.MonthLocator())
+    axMain.format_xdata = mdates.DateFormatter('%Y-%m-%d')
+
+    plots = []
+
+    for n in range(0, nseries):
+        if n == 0:
+            ax = axMain
+        else:
+            ax = ax.twinx()
+
+        plots += ax.plot(x, means[n], color=SERIES_COLORS[n], zorder=10, 
linewidth=3, label=meta[n]['title'])
+        ax.set_ylabel(meta[n]['units'])
+
+    labs = [l.get_label() for l in plots]
+    axMain.legend(plots, labs, loc=0)
+
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    return sio.getvalue()
+
+
+def createScatterPlot(res, meta):
+    timeSeries = []
+    series0 = []
+    series1 = []
+
+    for m in res:
+        if len(m) == 2:
+            timeSeries.append(datetime.datetime.fromtimestamp(m[0]["time"] / 
1000))
+            series0.append(m[0]["mean"])
+            series1.append(m[1]["mean"])
+
+    title = ', '.join(set([m['title'] for m in meta]))
+    sources = ', '.join(set([m['source'] for m in meta]))
+    dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), 
timeSeries[-1].strftime('%b %Y'))
+
+    print len(timeSeries), len(series0), len(series1)
+    fig, ax = plt.subplots()
+    fig.set_size_inches(11.0, 8.5)
+    ax.scatter(series0, series1, alpha=0.5)
+    ax.set_xlabel(meta[0]['units'])
+    ax.set_ylabel(meta[1]['units'])
+    ax.set_title("%s\n%s\n%s" % (title, sources, dateRange))
+
+    par = np.polyfit(series0, series1, 1, full=True)
+    slope = par[0][0]
+    intercept = par[0][1]
+    xl = [min(series0), max(series0)]
+    yl = [slope * xx + intercept for xx in xl]
+    plt.plot(xl, yl, '-r')
+
+    ax.grid(True)
+    fig.tight_layout()
+
+    sio = StringIO()
+    plt.savefig(sio, format='png')
+    return sio.getvalue()
+
+
+def createTimeSeriesTest(ds, minLat, minLon, maxLat, maxLon, startTime, 
endTime, mask, create=False, delete=True):
+    if type(ds) != list and type(ds) != tuple:
+        ds = (ds,)
+
+    if create:
+        config = ConfigParser.RawConfigParser()
+        config.read("sci.ini")
+        sci = Sci(config)
+        res, meta = sci.getTimeSeriesStatsForBox(minLat,
+                                                 maxLat,
+                                                 minLon,
+                                                 maxLon,
+                                                 ds,
+                                                 startTime,
+                                                 endTime,
+                                                 mask,
+                                                 True,
+                                                 True)
+
+        output = open('data_sc_lp.pkl', 'wb')
+        pickle.dump((res, meta), output)
+        output.close()
+
+    pkl_file = open('data_sc_lp.pkl', 'rb')
+    data1 = pickle.load(pkl_file)
+    pkl_file.close()
+    res, meta = data1
+    bytes = createScatterPlot(res, meta)
+    # bytes = createTimeSeries(res, meta, nseries=len(ds))
+    __writeAndShowImageData(bytes, delete=delete)
+
+
+if __name__ == "__main__":
+    ds = "NCDC-L4LRblend-GLOB-AVHRR_OI"
+
+    # minLat=21.53978071813801
+    # minLon=-25.20013561141637
+    # maxLat=48.57009377619356
+    # maxLon=8.446972830642368
+
+    minLat = -2.67487472970339
+    minLon = -78.13449868344182
+    maxLat = 65.46403943747828
+    maxLon = 4.6458350568532865
+
+    # minLat = -89
+    # maxLat = 89
+    # minLon = -179
+    # maxLon = 179
+
+    startTime = 1201939200000
+    endTime = 1328169600000
+    mask = 3
+
+    ds = (ds, "SSH_alti_1deg_1mon")
+    createTimeSeriesTest(ds, minLat, minLon, maxLat, maxLon, startTime, 
endTime, mask, create=False, delete=False)
+
+    # createLatitudeHoffmuellerTest(ds, minLat, minLon, maxLat, maxLon, 
startTime, endTime, mask, create=False, delete=False)
+    # createLongitudeHoffmuellerTest(ds, minLat, minLon, maxLat, maxLon, 
startTime, endTime, mask, create=False, delete=False)
+    # __createLatLonTimeAverageMapTest(ds, minLat, minLon, maxLat, maxLon, 
startTime, endTime, mask, create=False, delete=False, projection='projected')

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/webapp.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
new file mode 100644
index 0000000..121624a
--- /dev/null
+++ b/analysis/webservice/webapp.py
@@ -0,0 +1,230 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import ConfigParser
+import importlib
+import json
+import logging
+import sys, os
+import traceback
+from multiprocessing.pool import ThreadPool
+
+import matplotlib
+import pkg_resources
+import tornado.web
+from tornado.options import define, options, parse_command_line
+
+from webservice import NexusHandler
+from webservice.webmodel import NexusRequestObject, NexusProcessingException
+
+matplotlib.use('Agg')
+
+
+class ContentTypes(object):
+    CSV = "CSV"
+    JSON = "JSON"
+    XML = "XML"
+    PNG = "PNG"
+    NETCDF = "NETCDF"
+    ZIP = "ZIP"
+
+
+class BaseHandler(tornado.web.RequestHandler):
+    path = r"/"
+
+    def initialize(self, thread_pool):
+        self.logger = logging.getLogger('nexus')
+        self.request_thread_pool = thread_pool
+
+    @tornado.web.asynchronous
+    def get(self):
+
+        self.logger.info("Received request %s" % self._request_summary())
+        self.request_thread_pool.apply_async(self.run)
+
+    def run(self):
+        self.set_header("Access-Control-Allow-Origin", "*")
+        reqObject = NexusRequestObject(self)
+        try:
+            result = self.do_get(reqObject)
+            self.async_callback(result)
+        except NexusProcessingException as e:
+            self.async_onerror_callback(e.reason, e.code)
+        except Exception as e:
+            self.async_onerror_callback(str(e), 500)
+
+    def async_onerror_callback(self, reason, code=500):
+        self.logger.error("Error processing request", exc_info=True)
+
+        self.set_header("Content-Type", "application/json")
+        self.set_status(code)
+
+        response = {
+            "error": reason,
+            "code": code
+        }
+
+        self.write(json.dumps(response, indent=5))
+        self.finish()
+
+    def async_callback(self, result):
+        self.finish()
+
+    ''' Override me for standard handlers! '''
+
+    def do_get(self, reqObject):
+        pass
+
+
+class ModularNexusHandlerWrapper(BaseHandler):
+    def initialize(self, thread_pool, clazz=None, algorithm_config=None, 
sc=None):
+        BaseHandler.initialize(self, thread_pool)
+        self.__algorithm_config = algorithm_config
+        self.__clazz = clazz
+        self.__sc = sc
+
+    def do_get(self, request):
+        instance = 
self.__clazz.instance(algorithm_config=self.__algorithm_config, sc=self.__sc)
+
+        results = instance.calc(request)
+
+        try:
+            self.set_status(results.status_code)
+        except AttributeError:
+            pass
+
+        if request.get_content_type() == ContentTypes.JSON:
+            self.set_header("Content-Type", "application/json")
+            try:
+                self.write(results.toJson())
+            except AttributeError:
+                traceback.print_exc(file=sys.stdout)
+                self.write(json.dumps(results, indent=4))
+        elif request.get_content_type() == ContentTypes.PNG:
+            self.set_header("Content-Type", "image/png")
+            try:
+                self.write(results.toImage())
+            except AttributeError:
+                traceback.print_exc(file=sys.stdout)
+                raise NexusProcessingException(reason="Unable to convert 
results to an Image.")
+        elif request.get_content_type() == ContentTypes.CSV:
+            self.set_header("Content-Type", "text/csv")
+            self.set_header("Content-Disposition", "filename=\"%s\"" % 
request.get_argument('filename', "download.csv"))
+            try:
+                self.write(results.toCSV())
+            except:
+                traceback.print_exc(file=sys.stdout)
+                raise NexusProcessingException(reason="Unable to convert 
results to CSV.")
+        elif request.get_content_type() == ContentTypes.NETCDF:
+            self.set_header("Content-Type", "application/x-netcdf")
+            self.set_header("Content-Disposition", "filename=\"%s\"" % 
request.get_argument('filename', "download.nc"))
+            try:
+                self.write(results.toNetCDF())
+            except:
+                traceback.print_exc(file=sys.stdout)
+                raise NexusProcessingException(reason="Unable to convert 
results to NetCDF.")
+        elif request.get_content_type() == ContentTypes.ZIP:
+            self.set_header("Content-Type", "application/zip")
+            self.set_header("Content-Disposition", "filename=\"%s\"" % 
request.get_argument('filename', "download.zip"))
+            try:
+                self.write(results.toZip())
+            except:
+                traceback.print_exc(file=sys.stdout)
+                raise NexusProcessingException(reason="Unable to convert 
results to Zip.")
+
+        return results
+
+    def async_callback(self, result):
+        super(ModularNexusHandlerWrapper, self).async_callback(result)
+        if hasattr(result, 'cleanup'):
+            result.cleanup()
+
+
+if __name__ == "__main__":
+    logging.basicConfig(
+        level=logging.DEBUG,
+        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+        datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+
+    log = logging.getLogger(__name__)
+
+    webconfig = ConfigParser.RawConfigParser()
+    webconfig.readfp(pkg_resources.resource_stream(__name__, 
"config/web.ini"), filename='web.ini')
+
+    algorithm_config = ConfigParser.RawConfigParser()
+    algorithm_config.readfp(pkg_resources.resource_stream(__name__, 
"config/algorithms.ini"), filename='algorithms.ini')
+
+    define("debug", default=False, help="run in debug mode")
+    define("port", default=webconfig.get("global", "server.socket_port"), 
help="run on the given port", type=int)
+    define("address", default=webconfig.get("global", "server.socket_host"), 
help="Bind to the given address")
+    parse_command_line()
+
+    moduleDirs = webconfig.get("modules", "module_dirs").split(",")
+    for moduleDir in moduleDirs:
+        log.info("Loading modules from %s" % moduleDir)
+        importlib.import_module(moduleDir)
+
+    staticDir = webconfig.get("static", "static_dir")
+    staticEnabled = webconfig.get("static", "static_enabled") == "true"
+
+    log.info("Initializing on host address '%s'" % options.address)
+    log.info("Initializing on port '%s'" % options.port)
+    log.info("Starting web server in debug mode: %s" % options.debug)
+    if staticEnabled:
+        log.info("Using static root path '%s'" % staticDir)
+    else:
+        log.info("Static resources disabled")
+
+    handlers = []
+
+    log.info("Running Nexus Initializers")
+    NexusHandler.executeInitializers(algorithm_config)
+
+    max_request_threads = webconfig.getint("global", 
"server.max_simultaneous_requests")
+    log.info("Initializing request ThreadPool to %s" % max_request_threads)
+    request_thread_pool = ThreadPool(processes=max_request_threads)
+
+    spark_context = None
+    for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
+        if issubclass(clazzWrapper.clazz(), NexusHandler.SparkHandler):
+            if spark_context is None:
+                from pyspark import SparkContext, SparkConf
+
+                # Configure Spark
+                sp_conf = SparkConf()
+                sp_conf.setAppName("nexus-analysis")
+                sp_conf.set("spark.scheduler.mode", "FAIR")
+                sp_conf.set("spark.executor.memory", "6g")
+                spark_context = SparkContext(conf=sp_conf)
+
+            handlers.append(
+                (clazzWrapper.path(), ModularNexusHandlerWrapper,
+                 dict(clazz=clazzWrapper, algorithm_config=algorithm_config, 
sc=spark_context,
+                      thread_pool=request_thread_pool)))
+        else:
+            handlers.append(
+                (clazzWrapper.path(), ModularNexusHandlerWrapper,
+                 dict(clazz=clazzWrapper, algorithm_config=algorithm_config, 
thread_pool=request_thread_pool)))
+
+
+    class VersionHandler(tornado.web.RequestHandler):
+        def get(self):
+            self.write(pkg_resources.get_distribution("nexusanalysis").version)
+
+
+    handlers.append((r"/version", VersionHandler))
+
+    if staticEnabled:
+        handlers.append(
+            (r'/(.*)', tornado.web.StaticFileHandler, {'path': staticDir, 
"default_filename": "index.html"}))
+
+    app = tornado.web.Application(
+        handlers,
+        default_host=options.address,
+        debug=options.debug
+    )
+    app.listen(options.port)
+
+    log.info("Starting HTTP listener...")
+    tornado.ioloop.IOLoop.current().start()

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/webmodel.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/webmodel.py b/analysis/webservice/webmodel.py
new file mode 100644
index 0000000..70621af
--- /dev/null
+++ b/analysis/webservice/webmodel.py
@@ -0,0 +1,519 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import re
+import json
+import numpy as np
+from shapely.geometry import Polygon
+from datetime import datetime
+from decimal import Decimal
+import time
+import inspect
+import hashlib
+from pytz import UTC, timezone
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+
+class RequestParameters(object):
+    SEASONAL_CYCLE_FILTER = "seasonalFilter"
+    MAX_LAT = "maxLat"
+    MIN_LAT = "minLat"
+    MAX_LON = "maxLon"
+    MIN_LON = "minLon"
+    DATASET = "ds"
+    ENVIRONMENT = "env"
+    OUTPUT = "output"
+    START_TIME = "startTime"
+    END_TIME = "endTime"
+    START_YEAR = "startYear"
+    END_YEAR = "endYear"
+    CLIM_MONTH = "month"
+    START_ROW = "start"
+    ROW_COUNT = "numRows"
+    APPLY_LOW_PASS = "lowPassFilter"
+    LOW_CUT = "lowCut"
+    ORDER = "lpOrder"
+    PLOT_SERIES = "plotSeries"
+    PLOT_TYPE = "plotType"
+    SPARK_CFG = "spark"
+
+class StandardNexusErrors:
+    UNKNOWN = 1000
+    NO_DATA = 1001
+    DATASET_MISSING = 1002
+
+
+class NexusProcessingException(Exception):
+    def __init__(self, error=StandardNexusErrors.UNKNOWN, reason="", code=500):
+        self.error = error
+        self.reason = reason
+        self.code = code
+        Exception.__init__(self, reason)
+
+
+class NoDataException(NexusProcessingException):
+    def __init__(self, reason="No data found for the selected timeframe"):
+        NexusProcessingException.__init__(self, StandardNexusErrors.NO_DATA, 
reason, 400)
+
+
+class DatasetNotFoundException(NexusProcessingException):
+    def __init__(self, reason="Dataset not found"):
+        NexusProcessingException.__init__(self, 
StandardNexusErrors.DATASET_MISSING, reason, code=404)
+
+
+class SparkConfig(object):
+    MAX_NUM_EXECS = 64
+    MAX_NUM_PARTS = 8192
+    DEFAULT = "local,1,1"
+
+class StatsComputeOptions(object):
+    def __init__(self):
+        pass
+
+    def get_apply_seasonal_cycle_filter(self, default="false"):
+        raise Exception("Please implement")
+
+    def get_max_lat(self, default=90.0):
+        raise Exception("Please implement")
+
+    def get_min_lat(self, default=-90.0):
+        raise Exception("Please implement")
+
+    def get_max_lon(self, default=180):
+        raise Exception("Please implement")
+
+    def get_min_lon(self, default=-180):
+        raise Exception("Please implement")
+
+    def get_dataset(self):
+        raise Exception("Please implement")
+
+    def get_environment(self):
+        raise Exception("Please implement")
+
+    def get_start_time(self):
+        raise Exception("Please implement")
+
+    def get_end_time(self):
+        raise Exception("Please implement")
+
+    def get_start_year(self):
+        raise Exception("Please implement")
+
+    def get_end_year(self):
+        raise Exception("Please implement")
+
+    def get_clim_month(self):
+        raise Exception("Please implement")
+
+    def get_start_row(self):
+        raise Exception("Please implement")
+
+    def get_end_row(self):
+        raise Exception("Please implement")
+
+    def get_content_type(self):
+        raise Exception("Please implement")
+
+    def get_apply_low_pass_filter(self, default=False):
+        raise Exception("Please implement")
+
+    def get_low_pass_low_cut(self, default=12):
+        raise Exception("Please implement")
+
+    def get_low_pass_order(self, default=9):
+        raise Exception("Please implement")
+
+    def get_plot_series(self, default="mean"):
+        raise Exception("Please implement")
+
+    def get_plot_type(self, default="default"):
+        raise Exception("Please implement")
+
+    def get_spark_cfg (self, default=SparkConfig.DEFAULT):
+        raise Exception("Please implement")
+
+
+class NexusRequestObject(StatsComputeOptions):
+    shortNamePattern = re.compile("^[a-zA-Z0-9_\-,\.]+$")
+    floatingPointPattern = 
re.compile('[+-]?(\d+(\.\d*)?|\.\d+)([eE][+-]?\d+)?')
+
+    def __init__(self, reqHandler):
+        if reqHandler is None:
+            raise Exception("Request handler cannot be null")
+        self.requestHandler = reqHandler
+        StatsComputeOptions.__init__(self)
+
+    def get_argument(self, name, default=None):
+        return self.requestHandler.get_argument(name, default=default)
+
+    def get_list_int_arg(self, name, default=None):
+        arg = self.get_argument(name, default=default)
+        return arg.split(',')
+
+    def __validate_is_shortname(self, v):
+        if v is None or len(v) == 0:
+            return False
+        return self.shortNamePattern.match(v) is not None
+
+    def __validate_is_number(self, v):
+        if v is None or (type(v) == str and len(v) == 0):
+            return False
+        elif type(v) == int or type(v) == float:
+            return True
+        else:
+            return self.floatingPointPattern.match(v) is not None
+
+    def get_float_arg(self, name, default=0.0):
+        arg = self.get_argument(name, default)
+        if self.__validate_is_number(arg):
+            return float(arg)
+        else:
+            return default
+
+    def get_decimal_arg(self, name, default=0.0):
+        arg = self.get_argument(name, default)
+        if self.__validate_is_number(arg):
+            return Decimal(arg)
+        else:
+            if default is None:
+                return None
+            return Decimal(default)
+
+    def get_int_arg(self, name, default=0):
+        arg = self.get_argument(name, default)
+        if self.__validate_is_number(arg):
+            return int(arg)
+        else:
+            return default
+
+    def get_boolean_arg(self, name, default=False):
+        arg = self.get_argument(name, "false" if not default else "true")
+        return arg is not None and arg in ['true', '1', 't', 'y', 'yes', 
'True', 'T', 'Y',
+                                           'Yes', True]
+
+    def get_datetime_arg(self, name, default=None):
+        time_str = self.get_argument(name, default=default)
+        if time_str == default:
+            return default
+        try:
+            dt = datetime.strptime(time_str, 
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+        except ValueError:
+            dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC)
+        return dt
+
+    def get_apply_seasonal_cycle_filter(self, default=True):
+        return self.get_boolean_arg(RequestParameters.SEASONAL_CYCLE_FILTER, 
default=default)
+
+    def get_max_lat(self, default=Decimal(90)):
+        return self.get_decimal_arg("maxLat", default)
+
+    def get_min_lat(self, default=Decimal(-90)):
+        return self.get_decimal_arg("minLat", default)
+
+    def get_max_lon(self, default=Decimal(180)):
+        return self.get_decimal_arg("maxLon", default)
+
+    def get_min_lon(self, default=Decimal(-180)):
+        return self.get_decimal_arg("minLon", default)
+
+    def get_bounding_polygon(self):
+        west, south, east, north = [float(b) for b in 
self.get_argument("b").split(",")]
+        polygon = Polygon([(west, south), (east, south), (east, north), (west, 
north), (west, south)])
+        return polygon
+
+    def get_dataset(self):
+        ds = self.get_argument(RequestParameters.DATASET, None)
+        if ds is not None and not self.__validate_is_shortname(ds):
+            raise Exception("Invalid shortname")
+        else:
+            return ds.split(",")
+
+    def get_environment(self):
+        env = self.get_argument(RequestParameters.ENVIRONMENT, None)
+        if env is None and "Origin" in self.requestHandler.request.headers:
+            origin = self.requestHandler.request.headers["Origin"]
+            if origin == "http://localhost:63342":
+                env = "DEV"
+            if origin == "https://sealevel.uat.earthdata.nasa.gov":
+                env = "UAT"
+            elif origin == "https://sealevel.sit.earthdata.nasa.gov":
+                env = "SIT"
+            elif origin == "https://sealevel.earthdata.nasa.gov":
+                env = "PROD"
+
+        if env not in ("DEV", "SIT", "UAT", "PROD", None):
+            raise Exception("Invalid Environment")
+        else:
+            return env
+
+    def get_start_time(self):
+        return self.get_int_arg(RequestParameters.START_TIME, 0)
+
+    def get_end_time(self):
+        return self.get_int_arg(RequestParameters.END_TIME, -1)
+
+    def get_start_year(self):
+        return self.get_int_arg(RequestParameters.START_YEAR, 0)
+
+    def get_end_year(self):
+        return self.get_int_arg(RequestParameters.END_YEAR, -1)
+
+    def get_clim_month(self):
+        return self.get_int_arg(RequestParameters.CLIM_MONTH, -1)
+
+    def get_start_datetime(self):
+        time_str = self.get_argument(RequestParameters.START_TIME)
+        try:
+            dt = datetime.strptime(time_str, 
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+        except ValueError:
+            dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC)
+        return dt
+
+    def get_end_datetime(self):
+        time_str = self.get_argument(RequestParameters.END_TIME)
+        try:
+            dt = datetime.strptime(time_str, 
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+        except ValueError:
+            dt = datetime.utcfromtimestamp(int(time_str)).replace(tzinfo=UTC)
+        return dt
+
+    def get_start_datetime_ms(self):
+        time_str = self.get_argument(RequestParameters.START_TIME)
+        try:
+            dt = datetime.strptime(time_str, 
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+        except ValueError:
+            dt = 
datetime.utcfromtimestamp(int(time_str)/1000).replace(tzinfo=UTC)
+        return dt
+
+    def get_end_datetime_ms(self):
+        time_str = self.get_argument(RequestParameters.END_TIME)
+        try:
+            dt = datetime.strptime(time_str, 
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
+        except ValueError:
+            dt = 
datetime.utcfromtimestamp(int(time_str)/1000).replace(tzinfo=UTC)
+        return dt
+
+    def get_start_row(self):
+        return self.get_int_arg(RequestParameters.START_ROW, 0)
+
+    def get_row_count(self):
+        return self.get_int_arg(RequestParameters.ROW_COUNT, 10)
+
+    def get_content_type(self):
+        return self.get_argument(RequestParameters.OUTPUT, "JSON")
+
+    def get_apply_low_pass_filter(self, default=True):
+        return self.get_boolean_arg(RequestParameters.APPLY_LOW_PASS, default)
+
+    def get_low_pass_low_cut(self, default=12):
+        return self.get_float_arg(RequestParameters.LOW_CUT, default)
+
+    def get_low_pass_order(self, default=9):
+        return self.get_float_arg(RequestParameters.ORDER, default)
+
+    def get_include_meta(self):
+        return self.get_boolean_arg("includemeta", True)
+
+    def get_plot_series(self, default="mean"):
+        return self.get_argument(RequestParameters.PLOT_SERIES, 
default=default)
+
+    def get_plot_type(self, default="default"):
+        return self.get_argument(RequestParameters.PLOT_TYPE, default=default)
+
+    def get_spark_cfg(self, default=SparkConfig.DEFAULT):
+        arg = self.get_argument(RequestParameters.SPARK_CFG, default)
+        try:
+            master,nexecs,nparts = arg.split(',')
+        except:
+            raise ValueError('Invalid spark configuration: %s' % arg)
+        if master not in ("local", "yarn", "mesos"):
+            raise ValueError('Invalid spark master: %s' % master)
+        nexecs = int(nexecs)
+        if (nexecs < 1) or (nexecs > SparkConfig.MAX_NUM_EXECS):
+            raise ValueError('Invalid number of Spark executors: %d (must be 
between 1 and %d)' % (nexecs, SparkConfig.MAX_NUM_EXECS))
+        nparts = int(nparts)
+        if (nparts < 1) or (nparts > SparkConfig.MAX_NUM_PARTS):
+            raise ValueError('Invalid number of Spark data partitions: %d 
(must be between 1 and %d)' % (nparts,SparkConfig.MAX_NUM_PARTS))
+        if master == "local":
+            master = "local[%d]" % nexecs
+        return master,nexecs,nparts
+
+class NexusResults:
+    def __init__(self, results=None, meta=None, stats=None, 
computeOptions=None, status_code=200, **args):
+        self.status_code = status_code
+        self.__results = results
+        self.__meta = meta if meta is not None else {}
+        self.__stats = stats if stats is not None else {}
+        self.__computeOptions = computeOptions
+        if computeOptions is not None:
+            self.__minLat = computeOptions.get_min_lat()
+            self.__maxLat = computeOptions.get_max_lat()
+            self.__minLon = computeOptions.get_min_lon()
+            self.__maxLon = computeOptions.get_max_lon()
+            self.__ds = computeOptions.get_dataset()
+            self.__startTime = computeOptions.get_start_time()
+            self.__endTime = computeOptions.get_end_time()
+        else:
+            self.__minLat = args["minLat"] if "minLat" in args else -90.0
+            self.__maxLat = args["maxLat"] if "maxLat" in args else 90.0
+            self.__minLon = args["minLon"] if "minLon" in args else -180.0
+            self.__maxLon = args["maxLon"] if "maxLon" in args else 180.0
+            self.__ds = args["ds"] if "ds" in args else None
+            self.__startTime = args["startTime"] if "startTime" in args else 
None
+            self.__endTime = args["endTime"] if "endTime" in args else None
+
+        self.extendMeta(minLat=self.__minLat,
+                        maxLat=self.__maxLat,
+                        minLon=self.__minLon,
+                        maxLon=self.__maxLon,
+                        ds=self.__ds,
+                        startTime=self.__startTime,
+                        endTime=self.__endTime)
+
+    def computeOptions(self):
+        return self.__computeOptions
+
+    def results(self):
+        return self.__results
+
+    def meta(self):
+        return self.__meta
+
+    def stats(self):
+        return self.__stats
+
+    def _extendMeta(self, meta, minLat, maxLat, minLon, maxLon, ds, startTime, 
endTime):
+        if meta is None:
+            return None
+
+        meta["shortName"] = ds
+        if "title" in meta and "units" in meta:
+            meta["label"] = "%s (%s)" % (meta["title"], meta["units"])
+        meta["bounds"] = {
+            "east": maxLon,
+            "west": minLon,
+            "north": maxLat,
+            "south": minLat
+        }
+        meta["time"] = {
+            "start": startTime,
+            "stop": endTime
+        }
+        return meta
+
+    def extendMeta(self, minLat, maxLat, minLon, maxLon, ds, startTime, 
endTime):
+        if self.__meta is None:
+            return None
+        if type(ds) == list:
+            for i in range(0, len(ds)):
+                shortName = ds[i]
+
+                if type(self.__meta) == list:
+                    subMeta = self.__meta[i]
+                else:
+                    subMeta = self.__meta  # Risky
+                self._extendMeta(subMeta, minLat, maxLat, minLon, maxLon, 
shortName, startTime, endTime)
+        else:
+            if type(self.__meta) == list:
+                self.__meta = self.__meta[0]
+            else:
+                self.__meta = self.__meta  # Risky
+            self._extendMeta(self.__meta, minLat, maxLat, minLon, maxLon, ds, 
startTime, endTime)
+
+    def toJson(self):
+        data = {
+            'meta': self.__meta,
+            'data': self.__results,
+            'stats': self.__stats
+        }
+        return json.dumps(data, indent=4, cls=CustomEncoder)
+
+    def toImage(self):
+        raise Exception("Not implemented for this result type")
+
+
+class CustomEncoder(json.JSONEncoder):
+    def default(self, obj):
+        """If input object is an ndarray it will be converted into a dict
+        holding dtype, shape and the data, base64 encoded.
+        """
+        numpy_types = (
+            np.bool_,
+            # np.bytes_, -- python `bytes` class is not json serializable
+            # np.complex64,  -- python `complex` class is not json serializable
+            # np.complex128,  -- python `complex` class is not json 
serializable
+            # np.complex256,  -- python `complex` class is not json 
serializable
+            # np.datetime64,  -- python `datetime.datetime` class is not json 
serializable
+            np.float16,
+            np.float32,
+            np.float64,
+            # np.float128,  -- special handling below
+            np.int8,
+            np.int16,
+            np.int32,
+            np.int64,
+            # np.object_  -- should already be evaluated as python native
+            np.str_,
+            np.uint8,
+            np.uint16,
+            np.uint32,
+            np.uint64,
+            np.void,
+        )
+        if isinstance(obj, np.ndarray):
+            return obj.tolist()
+        elif isinstance(obj, numpy_types):
+            return obj.item()
+        elif isinstance(obj, np.float128):
+            return obj.astype(np.float64).item()
+        elif isinstance(obj, Decimal):
+            return str(obj)
+        elif isinstance(obj, datetime):
+            return str(obj)
+        elif obj is np.ma.masked:
+            return str(np.NaN)
+        # Let the base class default method raise the TypeError
+        return json.JSONEncoder.default(self, obj)
+
+
+__CACHE = {}
+
+def cached(ttl=60000):
+
+    def _hash_function_signature(func):
+        hash_object = hashlib.md5(str(inspect.getargspec(func)) + str(func))
+        return hash_object.hexdigest()
+
+    def _now():
+        return int(round(time.time() * 1000))
+
+    def _expired(t):
+        if t is None or _now() - t > ttl:
+            return True
+        else:
+            return False
+
+    def _cached_decorator(func):
+
+        def func_wrapper(self, computeOptions, **args):
+            hash = _hash_function_signature(func)
+            force = computeOptions.get_boolean_arg("nocached", default=False)
+
+            if force or hash not in __CACHE or (hash in __CACHE and 
_expired(__CACHE[hash]["time"])):
+                result = func(self, computeOptions, **args)
+                __CACHE[hash] = {
+                    "time": _now(),
+                    "result": result
+                }
+
+            return __CACHE[hash]["result"]
+        return func_wrapper
+
+    return _cached_decorator
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/README.md
----------------------------------------------------------------------
diff --git a/client/README.md b/client/README.md
new file mode 100644
index 0000000..e69de29

Reply via email to