http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/nexuscli/__init__.py
----------------------------------------------------------------------
diff --git a/client/nexuscli/__init__.py b/client/nexuscli/__init__.py
new file mode 100644
index 0000000..1275c23
--- /dev/null
+++ b/client/nexuscli/__init__.py
@@ -0,0 +1,9 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+from nexuscli.nexuscli import TimeSeries
+from nexuscli.nexuscli import set_target
+from nexuscli.nexuscli import time_series
+from nexuscli.nexuscli import dataset_list
+from nexuscli.nexuscli import daily_difference_average

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/nexuscli/nexuscli.py
----------------------------------------------------------------------
diff --git a/client/nexuscli/nexuscli.py b/client/nexuscli/nexuscli.py
new file mode 100644
index 0000000..2643713
--- /dev/null
+++ b/client/nexuscli/nexuscli.py
@@ -0,0 +1,198 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+
+This module provides a native python client interface to the NEXUS 
(https://github.com/dataplumber/nexus) 
+webservice API.
+
+Usage:
+
+    import nexuscli
+    
+    nexuscli.set_target("http://nexus-webapp:8083";)
+    nexuscli.dataset_list()
+    
+"""
+import requests
+import numpy as np
+from datetime import datetime
+from collections import namedtuple, OrderedDict
+from pytz import UTC
+
+__pdoc__ = {}
+TimeSeries = namedtuple('TimeSeries', ('dataset', 'time', 'mean', 
'standard_deviation', 'count', 'minimum', 'maximum'))
+TimeSeries.__doc__ = '''\
+An object containing Time Series arrays.
+'''
+__pdoc__['TimeSeries.dataset'] = "Name of the Dataset"
+__pdoc__['TimeSeries.time'] = "`numpy` array containing times as `datetime` 
objects"
+__pdoc__['TimeSeries.mean'] = "`numpy` array containing means"
+__pdoc__['TimeSeries.standard_deviation'] = "`numpy` array containing standard 
deviations"
+__pdoc__['TimeSeries.count'] = "`numpy` array containing counts"
+__pdoc__['TimeSeries.minimum'] = "`numpy` array containing minimums"
+__pdoc__['TimeSeries.maximum'] = "`numpy` array containing maximums"
+
+ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
+
+target = 'http://localhost:8083'
+
+session = requests.session()
+
+
+def set_target(url):
+    """
+    Set the URL for the NEXUS webapp endpoint.  
+    
+    __url__ URL for NEXUS webapp endpoint   
+    __return__ None
+    """
+    global target
+    target = url
+
+
+def dataset_list():
+    """
+    Get a list of datasets and the start and end time for each.
+    
+    __return__ list of datasets. Each entry in the list contains `shortname`, 
`start`, and `end`
+    """
+    response = session.get("{}/list".format(target))
+    data = response.json()
+
+    list_response = []
+    for dataset in data:
+        dataset['start'] = datetime.utcfromtimestamp(dataset['start'] / 
1000).strftime(ISO_FORMAT)
+        dataset['end'] = datetime.utcfromtimestamp(dataset['end'] / 
1000).strftime(ISO_FORMAT)
+
+        ordered_dict = OrderedDict()
+        ordered_dict['shortName'] = dataset['shortName']
+        ordered_dict['start'] = dataset['start']
+        ordered_dict['end'] = dataset['end']
+        list_response.append(ordered_dict)
+
+    return list_response
+
+
+def daily_difference_average(dataset, bounding_box, start_datetime, 
end_datetime):
+    """
+    Generate an anomaly Time series for a given dataset, bounding box, and 
timeframe.
+    
+    __dataset__ Name of the dataset as a String  
+    __bounding_box__ Bounding box for area of interest as a 
`shapely.geometry.polygon.Polygon`  
+    __start_datetime__ Start time as a `datetime.datetime`  
+    __end_datetime__ End time as a `datetime.datetime`  
+    
+    __return__ List of `nexuscli.nexuscli.TimeSeries` namedtuples
+    """
+    url = "{}/dailydifferenceaverage_spark?".format(target)
+
+    params = {
+        'dataset': dataset,
+        'climatology': "{}_CLIM".format(dataset),
+        'b': ','.join(str(b) for b in bounding_box.bounds),
+        'startTime': start_datetime.strftime(ISO_FORMAT),
+        'endTime': end_datetime.strftime(ISO_FORMAT),
+    }
+
+    response = session.get(url, params=params)
+    response.raise_for_status()
+    response = response.json()
+
+    data = np.array(response['data']).flatten()
+
+    assert len(data) > 0, "No data found in {} between {} and {} for Datasets 
{}.".format(bounding_box.wkt,
+                                                                               
           start_datetime.strftime(
+                                                                               
               ISO_FORMAT),
+                                                                               
           end_datetime.strftime(
+                                                                               
               ISO_FORMAT),
+                                                                               
           dataset)
+
+    time_series_result = []
+
+    key_to_index = {k: x for x, k in enumerate(data[0].keys())}
+
+    time_series_data = np.array([tuple(each.values()) for each in [entry for 
entry in data]])
+
+    if len(time_series_data) > 0:
+        time_series_result.append(
+            TimeSeries(
+                dataset=dataset,
+                
time=np.array([datetime.utcfromtimestamp(t).replace(tzinfo=UTC) for t in
+                               time_series_data[:, key_to_index['time']]]),
+                mean=time_series_data[:, key_to_index['mean']],
+                standard_deviation=time_series_data[:, key_to_index['std']],
+                count=None,
+                minimum=None,
+                maximum=None,
+            )
+        )
+
+    return time_series_result
+
+
+def time_series(datasets, bounding_box, start_datetime, end_datetime, 
spark=False):
+    """
+    Send a request to NEXUS to calculate a time series.
+    
+    __datasets__ Sequence (max length 2) of the name of the dataset(s)  
+    __bounding_box__ Bounding box for area of interest as a 
`shapely.geometry.polygon.Polygon`  
+    __start_datetime__ Start time as a `datetime.datetime`  
+    __end_datetime__ End time as a `datetime.datetime`  
+    __spark__ Optionally use spark. Default: `False`
+    
+    __return__ List of `nexuscli.nexuscli.TimeSeries` namedtuples
+    """
+
+    if isinstance(datasets, str):
+        datasets = [datasets]
+
+    assert 0 < len(datasets) <= 2, "datasets must be a sequence of 1 or 2 
items"
+
+    params = {
+        'ds': ','.join(datasets),
+        'b': ','.join(str(b) for b in bounding_box.bounds),
+        'startTime': start_datetime.strftime(ISO_FORMAT),
+        'endTime': end_datetime.strftime(ISO_FORMAT),
+    }
+
+    if spark:
+        url = "{}/timeSeriesSpark?".format(target)
+        params['spark'] = "mesos,16,32"
+    else:
+        url = "{}/stats?".format(target)
+
+    response = session.get(url, params=params)
+    response.raise_for_status()
+    response = response.json()
+
+    data = np.array(response['data']).flatten()
+
+    assert len(data) > 0, "No data found in {} between {} and {} for Datasets 
{}.".format(bounding_box.wkt,
+                                                                               
           start_datetime.strftime(
+                                                                               
               ISO_FORMAT),
+                                                                               
           end_datetime.strftime(
+                                                                               
               ISO_FORMAT),
+                                                                               
           datasets)
+
+    time_series_result = []
+
+    for i in range(0, len(response['meta'])):
+        key_to_index = {k: x for x, k in enumerate(data[0].keys())}
+
+        time_series_data = np.array([tuple(each.values()) for each in [entry 
for entry in data if entry['ds'] == i]])
+
+        if len(time_series_data) > 0:
+            time_series_result.append(
+                TimeSeries(
+                    dataset=response['meta'][i]['shortName'],
+                    
time=np.array([datetime.utcfromtimestamp(t).replace(tzinfo=UTC) for t in
+                                   time_series_data[:, key_to_index['time']]]),
+                    mean=time_series_data[:, key_to_index['mean']],
+                    standard_deviation=time_series_data[:, 
key_to_index['std']],
+                    count=time_series_data[:, key_to_index['cnt']],
+                    minimum=time_series_data[:, key_to_index['min']],
+                    maximum=time_series_data[:, key_to_index['max']],
+                )
+            )
+
+    return time_series_result

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/nexuscli/test/__init__.py
----------------------------------------------------------------------
diff --git a/client/nexuscli/test/__init__.py b/client/nexuscli/test/__init__.py
new file mode 100644
index 0000000..82079a0
--- /dev/null
+++ b/client/nexuscli/test/__init__.py
@@ -0,0 +1,4 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/nexuscli/test/nexuscli_test.py
----------------------------------------------------------------------
diff --git a/client/nexuscli/test/nexuscli_test.py 
b/client/nexuscli/test/nexuscli_test.py
new file mode 100644
index 0000000..0beea95
--- /dev/null
+++ b/client/nexuscli/test/nexuscli_test.py
@@ -0,0 +1,30 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import unittest
+from datetime import datetime
+
+from shapely.geometry import box
+
+import nexuscli
+
+
+class TestCli(unittest.TestCase):
+    def test_time_series(self):
+        ts = nexuscli.time_series(("AVHRR_OI_L4_GHRSST_NCEI", 
"MEASURES_SLA_JPL_1603"), box(-150, 45, -120, 60),
+                                  datetime(2016, 1, 1), datetime(2016, 12, 31))
+
+        self.assertEqual(2, len(ts))
+
+    def test_list(self):
+        ds_list = nexuscli.dataset_list()
+
+        print(ds_list)
+        self.assertTrue(len(ds_list) > 0)
+
+    def test_daily_difference_average(self):
+        ts = nexuscli.daily_difference_average("AVHRR_OI_L4_GHRSST_NCEI", 
box(-150, 45, -120, 60),
+                                               datetime(2013, 1, 1), 
datetime(2014, 12, 31))
+
+        self.assertEqual(1, len(ts))

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/requirements.txt
----------------------------------------------------------------------
diff --git a/client/requirements.txt b/client/requirements.txt
new file mode 100644
index 0000000..a892ee6
--- /dev/null
+++ b/client/requirements.txt
@@ -0,0 +1,14 @@
+certifi==2017.4.17
+chardet==3.0.4
+idna==2.5
+Mako==1.0.7
+Markdown==2.4.1
+MarkupSafe==1.0
+nexuscli==1.0
+numpy==1.13.1
+pdoc==0.3.2
+Pygments==2.2.0
+pytz==2017.2
+requests==2.18.1
+Shapely==1.5.17.post1
+urllib3==1.21.1

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/setup.py
----------------------------------------------------------------------
diff --git a/client/setup.py b/client/setup.py
new file mode 100644
index 0000000..dc8ba23
--- /dev/null
+++ b/client/setup.py
@@ -0,0 +1,36 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+
+from setuptools import setup, find_packages
+
+__version__ = '1.0'
+
+setup(
+    name="nexuscli",
+    version=__version__,
+    packages=find_packages(),
+    url="https://github.jpl.nasa.gov/thuang/nexus";,
+
+    author="Team Nexus",
+
+    description="NEXUS Client Module",
+    long_description=open('README.md').read(),
+
+    platforms='any',
+
+    install_requires=[
+        'requests',
+        'shapely',
+        'numpy',
+        'pytz'
+    ],
+
+    classifiers=[
+        'Development Status :: 1 - Pre-Alpha',
+        'Intended Audience :: Developers',
+        'Operating System :: OS Independent',
+        'Programming Language :: Python :: 2.7',
+    ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/.gitignore
----------------------------------------------------------------------
diff --git a/climatology/.gitignore b/climatology/.gitignore
new file mode 100644
index 0000000..8ff7b03
--- /dev/null
+++ b/climatology/.gitignore
@@ -0,0 +1,11 @@
+.DS_Store
+
+dist
+
+build
+
+*.egg-info
+
+*_f.so*
+
+*.nc
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/ClimatologySpark.py
----------------------------------------------------------------------
diff --git a/climatology/clim/ClimatologySpark.py 
b/climatology/clim/ClimatologySpark.py
new file mode 100755
index 0000000..4b01466
--- /dev/null
+++ b/climatology/clim/ClimatologySpark.py
@@ -0,0 +1,455 @@
+"""
+ClimatologySpark.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on 
single machine.
+
+"""
+
+import sys, os, urlparse, urllib, re, time
+import numpy as N
+import matplotlib
+matplotlib.use('Agg')
+import matplotlib.pylab as M
+
+from variables import getVariables, close
+from cache import retrieveFile, CachePath
+from split import fixedSplit, splitByNDays
+from netCDF4 import Dataset, default_fillvals
+from pathos.multiprocessing import ProcessingPool as Pool
+from plotlib import imageMap, makeMovie
+
+from spatialFilter import spatialFilter
+from gaussInterp import gaussInterp         # calls into Fortran version 
gaussInterp_f.so
+#from gaussInterp_slow import gaussInterp_slow as gaussInterp       # pure 
python, slow debuggable version
+
+#import pyximport; pyximport.install(pyimport=False)
+#from gaussInterp import gaussInterp, gaussInterp_      # attempted cython 
version, had issues
+
+VERBOSE = 1
+
+# Possible execution modes
+# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark 
cluster.
+ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark']
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat']
+
+# Generate algorithmic name for N-day Climatology product
+SSTClimatologyTemplate = 
'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc'  #??
+
+# Simple mask and average functions to get us started, then add gaussian 
interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for 
better coverage
+#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0))
+
+def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var)
+
+def splitModisSst(seq, n):
+    for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m')):
+        yield chunk
+
+def mean(a): return N.ma.mean(a, axis=0)
+
+AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 
'spatialFilter': spatialFilter}
+
+PixelMeanConfig = {'name': 'pixelMean'}
+
+GaussInterpConfig = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means 
use input lat/lon grid
+                     'wlat': 3, 'wlon': 3,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': 
default_fillvals['f4']}
+
+GaussInterpConfig1a = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means 
use input lat/lon grid
+                     'wlat': 0.30, 'wlon': 0.30,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': 
default_fillvals['f4']}
+
+GaussInterpConfig1b = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means 
use input lat/lon grid
+                     'wlat': 0.08, 'wlon': 0.08,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': 
default_fillvals['f4']}
+
+GaussInterpConfig2 = {'name': 'gaussInterp',
+                     'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 
0.25),
+                     'wlat': 2., 'wlon': 2.,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': 
default_fillvals['f4']}
+
+FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]]    # divide by 16
+FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]]     # divide by 9
+
+SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16., 
+                        'spatialFilter': N.array(FilterGaussian, 
dtype=N.int32),
+                        'missingValue': default_fillvals['f4']}
+
+SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9.,
+                        'spatialFilter': N.array(FilterLowPass, dtype=N.int32),
+                        'missingValue': default_fillvals['f4']}
+
+# Directory to cache retrieved files in
+CachePath = '~/cache'
+
+
+def climByAveragingPeriods(urls,              # list of (daily) granule URLs 
for a long time period (e.g. a year)
+                    nEpochs,                  # compute a climatology for 
every N epochs (days) by 'averaging'
+                    nWindow,                  # number of epochs in window 
needed for averaging
+                    nNeighbors,               # number of neighbors on EACH 
side in lat/lon directions to use in averaging
+                    variable,                 # name of primary variable in 
file
+                    mask,                     # name of mask variable
+                    coordinates,              # names of coordinate arrays to 
read and pass on (e.g. 'lat' and 'lon')
+                    splitFn=splitModisSst,    # split function to use to 
partition the input URL list
+                    maskFn=qcMask,            # mask function to compute mask 
from mask variable
+                    averager='pixelMean',     # averaging function to use, one 
of ['pixelMean', 'gaussInterp', 'spatialFilter']
+                    averagingConfig={},       # dict of parameters to control 
the averaging function (e.g. gaussInterp)
+                    optimization='fortran',   # optimization mode (fortran or 
cython)
+                    mode='sequential',        # Map across time periods of 
N-days for concurrent work, executed by:
+                                              # 'sequential' map, 'multicore' 
using pool.map(), 'cluster' using pathos pool.map(),
+                                              # or 'spark' using PySpark
+                    numNodes=1,               # number of cluster nodes to use
+                    nWorkers=4,               # number of parallel workers per 
node
+                    averagingFunctions=AveragingFunctions,    # dict of 
possible averaging functions
+                    legalModes=ExecutionModes,  # list of possible execution 
modes
+                    cachePath=CachePath       # directory to cache retrieved 
files in
+                   ):
+    '''Compute a climatology every N days by applying a mask and averaging 
function.
+Writes the averaged variable grid, attributes of the primary variable, and the 
coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    if averagingConfig['name'] == 'gaussInterp':
+        averagingConfig['wlat'] = nNeighbors
+        averagingConfig['wlon'] = nNeighbors
+    try:
+        averageFn = averagingFunctions[averager]
+    except:
+        print >>sys.stderr, 'climatology: Error, Averaging function must be 
one of: %s' % str(averagingFunctions)
+        sys.exit(1)
+
+    urlSplits = [s for s in splitFn(urls, nEpochs)]
+
+    def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], 
format='NETCDF4', cachePath=cachePath):
+        n = len(urls)
+        if VERBOSE: print >>sys.stderr, urls
+
+        var = climByAveraging(urls, variable, mask, coordinates, maskFn, 
averageFn, averagingConfig, optimization, cachePath)
+
+        fn = os.path.split(urls[0])[1]
+        inFile = os.path.join(cachePath, fn)
+        method = averagingConfig['name']
+        fn = os.path.splitext(fn)[0]
+        day = fn[5:8]
+        nDays = int(var['time'][0])
+
+        if 'wlat' in averagingConfig:
+            wlat = averagingConfig['wlat']
+        else:
+            wlat = 1
+        if int(wlat) == wlat:
+            outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, 
method, int(wlat))    # mark each file with first day in period
+        else:
+            outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, 
nDays, method, wlat)    # mark each file with first day in period
+
+        outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, 
outFile, fillValue, format)
+
+        if plot == 'contour':
+            figFile = contourMap(var, variable, coordinates, n, outFile)
+        elif plot == 'histogram':
+#            figFile = histogram(var, variable, n, outFile)
+            figFile = None
+        else:
+            figFile = None
+        return (outFile, figFile)
+
+    if mode == 'sequential':
+        results = map(climsContoured, urlSplits)
+    elif mode == 'multicore':
+        pool = Pool(nWorkers)
+        results = pool.map(climsContoured, urlSplits)        
+    elif mode == 'cluster':
+        pass
+    elif mode == 'spark':
+        pass
+
+    return results
+
+
+def climByAveraging(urls,                    # list of granule URLs for a time 
period
+                    variable,                # name of primary variable in file
+                    mask,                    # name of mask variable
+                    coordinates,             # names of coordinate arrays to 
read and pass on (e.g. 'lat' and 'lon')
+                    maskFn=qcMask,           # mask function to compute mask 
from mask variable
+                    averageFn=mean,          # averaging function to use
+                    averagingConfig={},      # parameters to control averaging 
function (e.g. gaussInterp)
+                    optimization='fortran',  # optimization mode (fortran or 
cython)
+                    cachePath=CachePath
+                   ):
+    '''Compute a climatology over N arrays by applying a mask and averaging 
function.
+Returns the averaged variable grid, attributes of the primary variable, and 
the coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    n = len(urls)
+    varList = [variable, mask]
+    var = {}
+    vtime = N.zeros((n,), N.int32)
+
+    for i, url in enumerate(urls):
+        try:
+            path = retrieveFile(url, cachePath)
+            fn = os.path.split(path)[1]
+            vtime[i] = int(fn[5:8])     # KLUDGE: extract DOY from filename
+        except:
+            print >>sys.stderr, 'climByAveraging: Error, continuing without 
file %s' % url
+            accum[i] = emptyVar
+            continue
+        if path is None: continue
+        print >>sys.stderr, 'Read variables and mask ...'
+        try:
+            var, fh = getVariables(path, varList, arrayOnly=True, order='F', 
set_auto_mask=False)   # return dict of variable objects by name
+        except:
+            print >>sys.stderr, 'climByAveraging: Error, cannot read file %s' 
% path
+            accum[i] = emptyVar
+            continue
+        if i == 0:
+            dtype = var[variable].dtype
+            if 'int' in dtype.name: dtype = N.float32
+            shape = (n,) + var[variable].shape
+            accum = N.ma.empty(shape, dtype, order='F')
+            emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), 
order='F')  # totally masked variable array for missing or bad file reads
+
+            print >>sys.stderr, 'Read coordinates ...'
+            var, fh = getVariables(path, coordinates, var, arrayOnly=True, 
order='F')   # read coordinate arrays and add to dict
+
+        var[variable] = maskFn(var[variable], var[mask])     # apply quality 
mask variable to get numpy MA, turned off masking done by netCDF4 library
+#       var[variable] = var[variable][:]
+
+        # Echo variable range for sanity check
+        vals = var[variable].compressed()
+        print >>sys.stderr, 'Variable Range: min, max:', vals.min(), vals.max()
+
+        # Plot input grid
+#        figFile = histogram(vals, variable, n, os.path.split(path)[1])
+#        figFile = contourMap(var, variable, coordinates[1:], n, 
os.path.split(path)[1])
+
+        accum[i] = var[variable]                        # accumulate N arrays 
for 'averaging"""
+#        if i != 0 and i+1 != n: close(fh)              # REMEMBER: closing fh 
loses netCDF4 in-memory data structures
+        close(fh)
+
+    coordinates = ['time'] + coordinates               # add constructed time 
(days) as first coordinate
+    var['time'] = vtime
+
+    if averagingConfig['name'] == 'pixelMean':
+        print >>sys.stderr, 'Doing Pixel Average over %d grids ...' % n
+        start = time.time()
+        avg = averageFn(accum)                         # call pixel averaging 
function
+        end = time.time()
+        print >>sys.stderr, 'pixelMean execution time:', (end - start)
+        outlat = var[coordinates[1]].astype(N.float32)[:]
+        outlon = var[coordinates[2]].astype(N.float32)[:]
+    elif averagingConfig['name'] == 'gaussInterp':
+        print >>sys.stderr, 'Doing Gaussian Interpolation over %d grids ...' % 
n
+        var[variable] = accum
+        c = averagingConfig
+        latGrid = c['latGrid']; lonGrid = c['lonGrid']
+        if latGrid is not None and lonGrid is not None:
+            outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], 
dtype=N.float32, order='F')
+            outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], 
dtype=N.float32, order='F')
+        else:
+            outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F')
+            outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F')
+        varNames = [variable] + coordinates
+        start = time.time()
+        avg, weight, status = \
+            gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'],
+                        c['slat'], c['slon'], c['stime'], c['vfactor'], 
c['missingValue'],
+                        VERBOSE, optimization)
+        end = time.time()
+        var['outweight'] = weight.astype(N.float32)
+        print >>sys.stderr, 'gaussInterp execution time:', (end - start)
+    elif averagingConfig['name'] == 'spatialFilter':
+        print >>sys.stderr, 'Applying Spatial 3x3 Filter and then averaging 
over %d grids ...' % n
+        var[variable] = accum
+        c = averagingConfig
+        varNames = [variable] + coordinates
+        start = time.time()
+        avg, count, status = \
+            spatialFilter(var, varNames, c['spatialFilter'], 
c['normalization'], 
+                          c['missingValue'], VERBOSE, optimization)
+        end = time.time()
+        print >>sys.stderr, 'spatialFilter execution time:', (end - start)
+        outlat = var[coordinates[1]].astype(N.float32)[:]
+        outlon = var[coordinates[2]].astype(N.float32)[:]
+
+    var['out'+variable] = avg.astype(N.float32)            # return primary 
variable & mask arrays in dict
+    var['out'+mask] = N.ma.getmask(avg)
+    var['outlat'] = outlat
+    var['outlon'] = outlon
+    return var
+
+
+def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, 
fillValue=None, format='NETCDF4'):
+    '''Construct output bundle of arrays with NetCDF dimensions and attributes.
+Output variables and attributes will have same names as the input file.
+    '''
+    din = Dataset(inFile, 'r')
+    dout = Dataset(outFile, 'w', format=format)
+    print >>sys.stderr, 'Writing %s ...' % outFile
+
+    # Transfer global attributes from input file
+    for a in din.ncattrs():
+        dout.setncattr(a, din.getncattr(a))
+
+    # Add dimensions and variables, copying data
+    coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for 
coord in coordinates]    # here output lon, lat, etc.
+    for coord in coordinates:
+        v = dout.createVariable(coord, var['out'+coord].dtype, (coord,))
+        v[:] = var['out'+coord][:]
+    primaryVar = dout.createVariable(variable, var['out'+variable].dtype, 
coordinates, fill_value=fillValue)
+    primaryVar[:] = var['out'+variable][:]          # transfer array
+    maskVar = dout.createVariable(mask, 'i1', coordinates)
+    maskVar[:] = var['out'+mask].astype('i1')[:]
+
+    # Transfer variable attributes from input file
+    for k,v in dout.variables.iteritems():
+        for a in din.variables[k].ncattrs():
+            if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': 
continue
+            v.setncattr(a, din.variables[k].getncattr(a))
+        if k == variable:
+            try:
+#                if fillValue == None: fillValue = 
din.variables[k].getncattr('_FillValue')        # total kludge
+                if fillValue == None: fillValue = default_fillvals['f4']
+#                print >>sys.stderr, default_fillvals
+#                v.setncattr('_FillValue', fillValue)         # set proper 
_FillValue for climatology array
+                v.setncattr('missing_value', fillValue)
+                print >>sys.stderr, 'Setting missing_value for primary 
variable %s to %f' % (variable, fillValue)
+            except:
+                print >>sys.stderr, 'writeOutNetcdfVars: Warning, for variable 
%s no fill value specified or derivable from inputs.' % variable
+    din.close()
+    dout.close()
+    return outFile
+    
+
+def contourMap(var, variable, coordinates, n, outFile):
+    figFile = os.path.splitext(outFile)[0] + '_hist.png'
+    # TODO: Downscale variable array (SST) before contouring, matplotlib is 
TOO slow on large arrays
+    vals = var[variable][:]
+
+    # Fixed color scale, write file, turn off auto borders, set title, reverse 
lat direction so monotonically increasing??
+    imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
+             vmin=-2., vmax=45., outFile=figFile, autoBorders=False,
+             title='%s %d-day Mean from %s' % (variable.upper(), n, outFile))
+    print >>sys.stderr, 'Writing contour plot to %s' % figFile
+    return figFile
+
+
+def histogram(vals, variable, n, outFile):
+    figFile = os.path.splitext(outFile)[0] + '_hist.png'
+    M.clf()
+#    M.hist(vals, 47, (-2., 45.))
+    M.hist(vals, 94)
+    M.xlim(-5, 45)
+    M.xlabel('SST in degrees Celsius')
+    M.ylim(0, 300000)
+    M.ylabel('Count')
+    M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, 
outFile))
+    M.show()
+    print >>sys.stderr, 'Writing histogram plot to %s' % figFile
+    M.savefig(figFile)
+    return figFile
+
+
+def dailyFile2date(path, offset=1):
+    '''Convert YYYYDOY string in filename to date.'''
+    fn = os.path.split(path)[1]
+    year = int(fn[offset:offset+4])
+    doy = int(fn[offset+5:offset+8])
+    return fn[5:15].replace('.', '/')
+
+
+def formatRegion(r):
+    """Format lat/lon region specifier as string suitable for file name."""
+    if isinstance(r, str):
+        return r
+    else:
+        strs = [str(i).replace('-', 'm') for i in r]
+        return 'region-%s-%sby%s-%s' % tuple(strs)
+
+
+def formatGrid(r):
+    """Format lat/lon grid resolution specifier as string suitable for file 
name."""
+    if isinstance(r, str):
+        return r
+    else:
+        return str(r[0]) + 'by' + str(r[1])
+
+
+def main(args):
+    nEpochs = int(args[0])
+    nWindow = int(args[1])
+    nNeighbors = int(args[2])
+    averager = args[3]
+    optimization = args[4]
+    mode = args[5]
+    nWorkers = int(args[6])
+    urlFile = args[7]
+    urls = [s.strip() for s in open(urlFile, 'r')]
+    if averager == 'gaussInterp':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 
'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, 
averagingConfig=GaussInterpConfig,
+                             mode=mode, nWorkers=nWorkers)
+    elif averager == 'spatialFilter':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 
'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, 
averagingConfig=SpatialFilterConfig1,
+                             mode=mode, nWorkers=nWorkers)
+    elif averager == 'pixelMean':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 
'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, 
averagingConfig=PixelMeanConfig,
+                             mode=mode, nWorkers=nWorkers)
+    else:
+        print >>sys.stderr, 'climatology2: Error, averager must be one of', 
AveragingFunctions.keys()
+        sys.exit(1)
+    
+    if results[0][1] is not None:
+        makeMovie([r[1] for r in results], 'clim.mpg')    
+    return results
+
+    
+if __name__ == '__main__':
+    print main(sys.argv[1:])
+
+
+# Old Tests:
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 
urls_sst_10days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 
urls_sst_10days.txt
+
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 
urls_sst_40days.txt
+# python climatology2.py 5 5 0 pixelMean   fortran multicore  8 
urls_sst_40days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore  8 
urls_sst_40days.txt
+
+# Old Production:
+# python climatology2.py 5 5 0 pixelMean     fortran multicore  16 
urls_sst_2015.txt  >& log &
+# python climatology2.py 10 10 0 pixelMean     fortran multicore  16 
urls_sst_2015.txt  >& log &
+# python climatology2.py 5 5 3 gaussInterp   fortran multicore  16 
urls_sst_2015.txt  >& log &
+
+
+# Tests:
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 
urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 0 pixelMean   fortran multicore  4 
urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 
urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore  4 
urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 
urls_sst_daynight_5days_sorted.txt
+
+# Test number of neighbors needed:
+# python climatology2.py 5 7 3 gaussInterp fortran multicore  4 
urls_sst_daynight_20days_sorted.txt
+
+
+# Production:
+# python climatology2.py 5 7 0 pixelMean fortran multicore  4 
urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 
urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran multicore  4 
urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran multicore  4 
urls_sst_daynight_2003_2015_sorted.txt
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/ClimatologySpark2.py
----------------------------------------------------------------------
diff --git a/climatology/clim/ClimatologySpark2.py 
b/climatology/clim/ClimatologySpark2.py
new file mode 100755
index 0000000..64b0510
--- /dev/null
+++ b/climatology/clim/ClimatologySpark2.py
@@ -0,0 +1,636 @@
+"""
+ClimatologySpark2.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on 
single machine via pysparkling.
+
+"""
+
+import sys, os, urlparse, urllib, re, time, glob
+import numpy as N
+import matplotlib
+
+matplotlib.use('Agg')
+import matplotlib.pylab as M
+
+from clim.variables import getVariables, close
+from clim.cache import retrieveFile, CachePath, hdfsCopyFromLocal
+from netCDF4 import Dataset, default_fillvals
+from clim.plotlib import imageMap, makeMovie
+
+from clim.spatialFilter import spatialFilter
+from clim.gaussInterp import gaussInterp  # calls into Fortran version 
gaussInterp_f.so
+# from clim.gaussInterp_slow import gaussInterp_slow as gaussInterp       # 
pure python, slow debuggable version
+
+from clim.datasets import DatasetList, ModisSst, ModisChlor, MeasuresSsh
+from clim.sort import sortByKeys
+from clim.split import groupByKeys, splitByNDaysKeyed
+
+from pyspark import SparkContext, SparkConf
+
+DaskClientEndpoint = "daskclient:8786"
+
+VERBOSE = 0
+CollectAndTime = 0
+
+# Possible execution modes
+# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark 
cluster.
+ExecutionModes = ['spark', 'mesos', 'multicore']
+
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat']
+
+# Simple mask and average functions to get us started, then add gaussian 
interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for 
better coverage
+
+def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+
+
+# Example Configurations
+PixelMeanConfig = {'name': 'pixelMean', 'accumulators': ['count', 'mean', 
'M2']}
+
+GaussInterpConfig = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,  # None means use input 
lat/lon grid
+                     'wlat': 3, 'wlon': 3,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': 
default_fillvals['f4'],
+                     'optimization': 'fortran'}
+
+GaussInterpConfig1a = {'name': 'gaussInterp',
+                       'latGrid': None, 'lonGrid': None,  # None means use 
input lat/lon grid
+                       'wlat': 0.30, 'wlon': 0.30,
+                       'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                       'vfactor': -0.6931, 'missingValue': 
default_fillvals['f4'],
+                       'optimization': 'fortran'}
+
+GaussInterpConfig1b = {'name': 'gaussInterp',
+                       'latGrid': None, 'lonGrid': None,  # None means use 
input lat/lon grid
+                       'wlat': 0.08, 'wlon': 0.08,
+                       'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                       'vfactor': -0.6931, 'missingValue': 
default_fillvals['f4'],
+                       'optimization': 'fortran'}
+
+GaussInterpConfig2 = {'name': 'gaussInterp',
+                      'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 
179., 0.25),
+                      'wlat': 2., 'wlon': 2.,
+                      'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                      'vfactor': -0.6931, 'missingValue': 
default_fillvals['f4'],
+                      'optimization': 'fortran'}
+
+FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]]  # divide by 16
+FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]]  # divide by 9
+
+SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16.,
+                        'spatialFilter': N.array(FilterGaussian, 
dtype=N.int32),
+                        'missingValue': default_fillvals['f4']}
+
+SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9.,
+                        'spatialFilter': N.array(FilterLowPass, dtype=N.int32),
+                        'missingValue': default_fillvals['f4']}
+
+# Three kinds of averaging supported
+AveragingFunctions = {'pixelMean': None, 'gaussInterp': gaussInterp, 
'spatialFilter': spatialFilter}
+AveragingConfigs = {'pixelMean': PixelMeanConfig, 'gaussInterp': 
GaussInterpConfig1a,
+                    'spatialFilter': SpatialFilterConfig1}
+
+
+def climByAveragingPeriods(urls,  # list of (daily) granule URLs for a long 
time period (e.g. 15 years), *SORTED* by DOY
+                           datasetInfo,  # class holding dataset metadata
+                           nEpochs,  # compute a climatology for every N 
epochs (days) by 'averaging'
+                           nWindow,  # number of epochs in window needed for 
averaging
+                           variable,  # name of primary variable in file
+                           mask,  # name of mask variable
+                           coordinates,  # names of coordinate arrays to read 
and pass on (e.g. 'lat' and 'lon')
+                           reader,  # reader function that reads the variable 
from the file into a numpy masked array
+                           outHdfsPath,
+                           # HDFS path to write outputs to (i.e. netCDF file 
containing stats and optional contour plot)
+                           splitter,  # split function to use to partition the 
input URL list
+                           masker=qcMask,  # mask function to compute mask 
from mask variable
+                           averager='pixelMean',
+                           # averaging function to use, one of ['pixelMean', 
'gaussInterp', 'spatialFilter']
+                           averagingConfig={},
+                           # dict of parameters to control the averaging 
function (e.g. gaussInterp)
+                           sparkConfig='mesos,8,16',
+                           # Map across time periods of N-days for concurrent 
work, executed by:
+                           # 'spark to use PySpark or 'multicore' using 
pysparkling
+                           # 'spark,n,m' means use N executors and M partitions
+                           averagingFunctions=AveragingFunctions,  # dict of 
possible averaging functions
+                           legalModes=ExecutionModes,  # list of possible 
execution modes
+                           cachePath=CachePath  # directory to cache retrieved 
files in
+                           ):
+    '''Compute a climatology every N days by applying a mask and averaging 
function.
+Writes the climatology average, attributes of the primary variable, and the 
coordinate arrays to a series of netCDF files.
+Optionally generates a contour plot of each N-day climatology for verification.
+    '''
+    try:
+        averageFn = averagingFunctions[averager]
+    except:
+        print >> sys.stderr, 'climatology: Error, Averaging function must be 
one of: %s' % str(averagingFunctions)
+        sys.exit(1)
+    if 'accumulators' in averagingConfig:
+        accumulators = averagingConfig['accumulators']
+
+    # Split daily files into N-day periods for parallel tasks (e.g. 5-day clim 
means 73 periods)
+    # Keyed by integer DOY
+    urls = sortByKeys(urls, datasetInfo.getKeys)
+    urlSplits = splitter(urls, nEpochs)
+    urlSplits = groupByKeys(urlSplits)
+    print >> sys.stderr, 'Number of URL splits = ', len(urlSplits)
+    if VERBOSE: print >> sys.stderr, urlSplits
+    if len(urlSplits) == 0: sys.exit(1)
+
+    # Compute per-pixel statistics in parallel
+    with Timer("Parallel Stats"):
+        if sparkConfig.startswith('dask'):
+            outputFiles = parallelStatsDaskSimple(urlSplits, datasetInfo, 
nEpochs, variable, mask, coordinates, reader,
+                                                  outHdfsPath,
+                                                  averagingConfig, 
sparkConfig, accumulators)
+        else:
+            outputFiles = parallelStatsSparkSimple(urlSplits, datasetInfo, 
nEpochs, variable, mask, coordinates, reader,
+                                                   outHdfsPath,
+                                                   averagingConfig, 
sparkConfig, accumulators)
+    return outputFiles
+
+
+def parallelStatsSparkSimple(urlSplits, ds, nEpochs, variable, mask, 
coordinates, reader, outHdfsPath, averagingConfig,
+                             sparkConfig,
+                             accumulators=['count', 'mean', 'M2', 'min', 
'max']):
+    '''Compute N-day climatology statistics in parallel using PySpark or 
pysparkling.'''
+    with Timer("Configure Spark"):
+        sparkContext, numExecutors, numPartitions = 
configureSpark(sparkConfig, appName='parallelClimStats')
+
+    urlsRDD = sparkContext.parallelize(urlSplits, numPartitions)  # partition 
N-day periods into tasks
+
+    with CollectTimer("Map-reduce over %d partitions" % numPartitions):  # map 
to update accumulators in parallel
+        accum = urlsRDD.map(lambda urls: accumulate(urls, variable, mask, 
coordinates, reader, accumulators))
+
+    with CollectTimer("Compute merged statistics"):
+        stats = accum.map(statsFromAccumulators)  # compute final statistics 
from accumulators
+
+    with Timer("Write out climatology and optionally plot"):  # write stats to 
netCDF file
+        outputFiles = stats.map(
+            lambda s: writeAndPlot(s, ds, variable, coordinates, nEpochs, 
averagingConfig, outHdfsPath, plot=False)) \
+            .collect()
+    return outputFiles
+
+
+def parallelStatsSpark(urlSplits, ds, nEpochs, variable, mask, coordinates, 
reader, outHdfsPath, averagingConfig,
+                       sparkConfig,
+                       accumulators=['count', 'mean', 'M2', 'min', 'max']):
+    '''Compute N-day climatology statistics in parallel using PySpark or 
pysparkling.'''
+    with Timer("Configure Spark"):
+        sparkContext, numExecutors, numPartitions = 
configureSpark(sparkConfig, appName='parallelClimStats')
+
+    urlsRDD = sparkContext.parallelize(urlSplits, numPartitions)  # partition 
N-day periods into tasks
+
+    with CollectTimer("Map-reduce over %d partitions" % numPartitions):
+        merged = urlsRDD.map(lambda urls: accumulate(urls, variable, mask, 
coordinates, reader, accumulators)) \
+            .reduceByKey(combine)  # map-reduce to update accumulators in 
parallel
+
+    with CollectTimer("Compute merged statistics"):
+        stats = merged.map(statsFromAccumulators)  # compute final statistics 
from merged accumulators
+
+    with Timer("Write out climatology and optionally plot"):
+        outputFiles = stats.map(
+            lambda s: writeAndPlot(s, ds, variable, coordinates, nEpochs, 
averagingConfig, outHdfsPath, plot=False)) \
+            .collect()
+    return outputFiles
+
+
+def parallelStatsPipeline(urls, ds, nEpochs, variable, mask, coordinates, 
reader, averagingConfig, outHdfsPath,
+                          accumulators=['count', 'mean', 'M2', 'min', 'max']):
+    '''Hide entire pipeline in a single function.'''
+    outputFile = writeAndPlot(
+        statsFromAccumulators(accumulate(urls, variable, mask, coordinates, 
reader, accumulators)),
+        ds, variable, coordinates, nEpochs, averagingConfig, outHdfsPath, 
plot=False)
+    return outputFile
+
+
+def parallelStatsDaskSimple(urlSplits, ds, nEpochs, variable, mask, 
coordinates, reader, outHdfsPath, averagingConfig,
+                            sparkConfig,
+                            accumulators=['count', 'mean', 'M2', 'min', 
'max']):
+    '''Compute N-day climatology statistics in parallel using PySpark or 
pysparkling.'''
+    if not sparkConfig.startswith('dask,'):
+        print >> sys.stderr, "dask: configuration must be of form 'dask,n'"
+        sys.exit(1)
+    numPartitions = int(sparkConfig.split(',')[1])
+
+    with Timer("Configure Dask distributed"):
+        from distributed import Client, as_completed
+        client = Client(DaskClientEndpoint)
+
+    print >> sys.stderr, 'Starting parallel Stats using Dask . . .'
+    start = time.time()
+    futures = client.map(
+        lambda urls: parallelStatsPipeline(urls, ds, nEpochs, variable, mask, 
coordinates, reader, averagingConfig,
+                                           outHdfsPath, accumulators), 
urlSplits)
+
+    outputFiles = []
+    for future in as_completed(futures):
+        outputFile = future.result()
+        outputFiles.append(outputFile)
+        end = time.time()
+        print >> sys.stderr, "parallelStats: Completed %s in %0.3f seconds." % 
(outputFile, (end - start))
+    return outputFiles
+
+
+def writeAndPlot(stats, ds, variable, coordinates, nEpochs, averagingConfig, 
outHdfsPath, plot=False):
+    '''Write statistics to a netCDF4 file and optionally make a contour 
plot.'''
+    key, stats = stats
+    doy = int(key)
+    urls = stats['_meta']['urls']
+    #    outFile = 'A%03d.L3m_%s_%dday_clim_%s.nc' % (doy, variable, nEpochs, 
averagingConfig['name'])    # mark each file with first day in period
+    outFile = ds.genOutputName(doy, variable, nEpochs, averagingConfig)
+    firstInputFile = retrieveFile(urls[0])
+
+    outFile = writeStats(stats, firstInputFile, variable, coordinates, 
outFile)  # write to netCDF4 file
+    outHdfsFile = hdfsCopyFromLocal(outFile, outHdfsPath)
+
+    if plot:
+        coords = readCoordinates(firstInputFile, coordinates)
+        plotFile = contourMap(stats['mean'], variable, coords, nEpochs, 
outFile + '.png')
+        plotHdfsFile = hdfsCopyFromLocal(plotFile, outHdfsPath)
+        return (outHdfsFile, plotHdfsFile)
+    else:
+        return outHdfsFile
+
+
+def configureSpark(sparkConfig, appName, memoryPerExecutor='4G', 
coresPerExecutor=1):
+    mode, numExecutors, numPartitions = sparkConfig.split(',')
+    numExecutors = int(numExecutors)
+    print >> sys.stderr, 'numExecutors = ', numExecutors
+    numPartitions = int(numPartitions)
+    print >> sys.stderr, 'numPartitions = ', numPartitions
+    if mode == 'multicore':
+        print >> sys.stderr, 'Using pysparkling'
+        import pysparkling
+        sc = pysparkling.Context()
+    else:
+        print >> sys.stderr, 'Using PySpark'
+        sparkMaster = mode
+        spConf = SparkConf()
+        spConf.setAppName(appName)
+        spConf.set("spark.executorEnv.HOME",
+                   os.path.join(os.getenv('HOME'), 'spark_exec_home'))
+        spConf.set("spark.executorEnv.PYTHONPATH", os.getcwd())
+        spConf.set("spark.executor.memory", memoryPerExecutor)
+        print >> sys.stderr, 'memoryPerExecutor = ', memoryPerExecutor
+        try:
+            sparkMaster = SparkMasterOverride
+        except:
+            pass
+        if sparkMaster[:5] == "mesos":
+            spConf.set("spark.cores.max", numExecutors)
+        else:
+            # Spark master is YARN or local[N]
+            spConf.set("spark.executor.instances", numExecutors)
+            spConf.set("spark.executor.cores", coresPerExecutor)
+            spConf.setMaster(sparkMaster)
+        sc = SparkContext(conf=spConf)
+    return sc, numExecutors, numPartitions
+
+
+def readAndMask(url, variable, mask=None, cachePath=CachePath, hdfsPath=None):
+    '''Read a variable from a netCDF or HDF file and return a numpy masked 
array.
+If the URL is remote or HDFS, first retrieve the file into a cache directory.
+    '''
+    v = None
+    if mask:
+        variables = [variable, mask]
+    else:
+        variables = [variable]
+    try:
+        path = retrieveFile(url, cachePath, hdfsPath)
+    except:
+        print >> sys.stderr, 'readAndMask: Error, continuing without file %s' 
% url
+        return v
+
+    try:
+        print >> sys.stderr, 'Reading variable %s from %s' % (variable, path)
+        var, fh = getVariables(path, variables, arrayOnly=True,
+                               set_auto_mask=True)  # return dict of variable 
objects by name
+        v = var[
+            variable]  # could be masked array
+        if v.shape[0] == 1: v = v[0]  # throw away trivial time dimension for 
CF-style files
+        if VERBOSE: print >> sys.stderr, 'Variable range: %fs to %f' % 
(v.min(), v.max())
+        close(fh)
+    except:
+        print >> sys.stderr, 'readAndMask: Error, cannot read variable %s from 
file %s' % (variable, path)
+
+    return v
+
+
+def readCoordinates(path, coordinates=['lat', 'lon']):
+    '''Read coordinate arrays from local netCDF file.'''
+    var, fh = getVariables(path, coordinates, arrayOnly=True, 
set_auto_mask=True)
+    close(fh)
+    return [var[k] for k in coordinates]
+
+
+def accumulate(urls, variable, maskVar, coordinates, reader=readAndMask,
+               accumulators=['count', 'mean', 'M2', 'min', 'max'], 
cachePath=CachePath, hdfsPath=None):
+    '''Accumulate data into statistics accumulators like count, sum, sumsq, 
min, max, M3, M4, etc.'''
+    keys, urls = urls
+    print >> sys.stderr, 'Updating accumulators %s for key %s' % 
(str(accumulators), str(keys))
+    accum = {}
+    accum['_meta'] = {'urls': urls, 'coordinates': coordinates}
+    for i, url in enumerate(urls):
+        v = reader(url, variable, maskVar, cachePath, hdfsPath)
+        if v is None: continue
+
+        if i == 0:
+            for k in accumulators:
+                if k[0] == '_': continue
+                if k == 'min':
+                    accum[k] = default_fillvals['f8'] * N.ones(v.shape, 
dtype=N.float64)
+                elif k == 'max':
+                    accum[k] = -default_fillvals['f8'] * N.ones(v.shape, 
dtype=N.float64)
+                elif k == 'count':
+                    accum[k] = N.zeros(v.shape, dtype=N.int64)
+                else:
+                    accum[k] = N.zeros(v.shape, dtype=N.float64)
+
+        if N.ma.isMaskedArray(v):
+            if 'count' in accumulators:
+                accum['count'] += ~v.mask
+            if 'min' in accumulators:
+                accum['min'] = N.ma.minimum(accum['min'], v)
+            if 'max' in accumulators:
+                accum['max'] = N.ma.maximum(accum['max'], v)
+
+            v = N.ma.filled(v, 0.)
+        else:
+            if 'count' in accumulators:
+                accum['count'] += 1
+            if 'min' in accumulators:
+                accum['min'] = N.minimum(accum['min'], v)
+            if 'max' in accumulators:
+                accum['max'] = N.maximum(accum['max'], v)
+
+        if 'mean' in accumulators:
+            n = accum['count']
+            #            mask = N.not_equal(n, 0)
+            # subtract running mean from new values, eliminate roundoff errors
+            #            delta = N.choose(mask, (0, v - accum['mean']))
+            delta = v - accum['mean']
+            #            delta_n = N.choose(mask, (0, delta/n))
+            delta_n = delta / n
+            delta_n = N.nan_to_num(delta_n)  # set to zero if n=0 caused a NaN
+            accum['mean'] += delta_n
+        if 'M2' in accumulators:
+            term = delta * delta_n * (n - 1)
+            accum['M2'] += term
+    return (keys, accum)
+
+
+def combine(a, b):
+    '''Combine accumulators by summing.'''
+    print >> sys.stderr, 'Combining accumulators . . .'
+    if 'urls' in a['_meta'] and 'urls' in b['_meta']:
+        a['_meta']['urls'].extend(b['_meta']['urls'])
+    if 'mean' in a:
+        ntotal = a['count'] + b['count']
+        #        mask = N.not_equal(ntotal, 0)
+        #        a['mean'] = N.choose(mask, (0, (a['mean'] * a['count'] + 
b['mean'] * b['count']) / ntotal))
+        a['mean'] = (a['mean'] * a['count'] + b['mean'] * b['count']) / ntotal
+    if 'min' in a:
+        if N.ma.isMaskedArray(a):
+            a['min'] = N.ma.minimum(a['min'], b['min'])
+        else:
+            a['min'] = N.minimum(a['min'], b['min'])
+    if 'max' in a:
+        if N.ma.isMaskedArray(a):
+            a['max'] = N.ma.maximum(a['max'], b['max'])
+        else:
+            a['max'] = N.maximum(a['max'], b['max'])
+    for k in a.keys():
+        if k[0] == '_': continue
+        if k != 'mean' and k != 'min' and k != 'max':
+            a[k] += b[k]  # just sum count and other moments
+    return a
+
+
+def statsFromAccumulators(accum):
+    '''Compute final statistics from accumulators.'''
+    print >> sys.stderr, 'Computing statistics from accumulators . . .'
+    keys, accum = accum
+
+    # Mask all of the accumulator arrays for zero counts
+    if 'count' in accum:
+        accum['count'] = N.ma.masked_less_equal(accum['count'], 0, copy=False)
+        mask = accum['count'].mask
+        for k in accum:
+            if k[0] == '_': continue
+            if k != 'count':
+                accum[k] = N.ma.array(accum[k], copy=False, mask=mask)
+
+    # Compute stats
+    stats = {}
+    if '_meta' in accum:
+        stats['_meta'] = accum['_meta']
+    if 'count' in accum:
+        stats['count'] = accum['count']
+    if 'min' in accum:
+        stats['min'] = accum['min']
+    if 'max' in accum:
+        stats['max'] = accum['max']
+    if 'mean' in accum:
+        stats['mean'] = accum['mean']
+    if 'M2' in accum:
+        stats['stddev'] = N.sqrt(accum['M2'] / (accum['count'] - 1))
+
+        # Convert stats arrays to masked arrays, keeping only cells with count 
> 0
+    #    mask = stats['count'] <= 0
+    #    for k in stats:
+    #        if k[0] == '_': continue
+    #        stats[k] = N.ma.masked_where(mask, stats[k], copy=False)
+    return (keys, stats)
+
+
+def writeStats(stats, inputFile, variable, coordinates, outFile, 
format='NETCDF4', cachePath='cache'):
+    '''Write out stats arrays to netCDF with some attributes.'''
+    if os.path.exists(outFile): os.unlink(outFile)
+    dout = Dataset(outFile, 'w', format=format)
+    print >> sys.stderr, 'Writing stats for variable %s to %s ...' % 
(variable, outFile)
+    print >> sys.stderr, 'Shape:', stats['mean'].shape
+    dout.setncattr('variable', variable)
+    dout.setncattr('urls', str(stats['_meta']['urls']))
+
+    din = Dataset(inputFile, 'r')
+    # Transfer global attributes from input file
+    #    for a in din.ncattrs():
+    #        dout.setncattr(a, din.getncattr(a))
+
+    print >> sys.stderr, 'Using coordinates & attributes from %s' % inputFile
+    coordinatesSave = coordinates
+    try:
+        coordinatesFromFile = din.variables[variable].getncattr('coordinates')
+        if 'lat' in coorindatesFromFile.lower() and 'lon' in 
coordinatesFromFile.lower():
+            coordinates = coordinatesFromFile.split()
+            if coordinates[0].lower() == 'time':   coordinates = coordinates[
+                                                                 1:]  # 
discard trivial time dimension for CF-style files
+        else:
+            coordinates = coordinatesSave  # use input coordinates
+    except:
+        if coordinates is None or len(coordinates) == 0:
+            coordinates = ('lat', 'lon')  # kludge: another possibility
+
+    # Add dimensions and variables, copying data                               
                                           
+    coordDim = [dout.createDimension(coord, din.variables[coord].shape[0]) for 
coord in
+                coordinates]  # here lat, lon, alt, etc.
+    for coord in coordinates:
+        var = dout.createVariable(coord, din.variables[coord].dtype, (coord,))
+        var[:] = din.variables[coord][:]
+
+    # Add stats variables                                                      
                                           
+    for k, v in stats.items():
+        if k[0] == '_': continue
+        var = dout.createVariable(k, stats[k].dtype, coordinates)
+        fillVal = default_fillvals[v.dtype.str.strip("<>")]  # remove endian 
part of dtype to do lookup
+        #        print >>sys.stderr, "Setting _FillValue attribute for %s to 
%s" % (k, fillVal)
+        var.setncattr('_FillValue', fillVal)
+        var[:] = v[:]
+
+        # Add attributes from variable in input file (does this make sense?)
+        if k == 'count': continue
+        try:
+            vin = din.variables[variable]
+            for a in vin.ncattrs():
+                if a == 'scale_factor' or a == 'add_offset' or a == 
'_FillValue': continue
+                var.setncattr(a, vin.getncattr(a))
+        except KeyError:
+            pass
+
+    din.close()
+    dout.close()
+    return outFile
+
+
+# CollectTimer context manager
+class CollectTimer(object):
+    '''Automatically collect Spark result and do timing.'''
+
+    def __init__(self, name):
+        self.name = name
+        self._collect = CollectAndTime
+
+    def __enter__(self):
+        self.start = time.time()
+
+    def __exit__(self, ty, val, tb):
+        if not self._collect:
+            return False
+        else:
+            # what goes here??
+            end = time.time()
+            print >> sys.stderr, "timer: " + self.name + ": %0.3f seconds" % 
(end - self.start)
+            sys.stdout.flush()
+            return False
+
+
+# Timer context manager
+class Timer(object):
+    '''Automatically collect Spark result and do timing.'''
+
+    def __init__(self, name):
+        self.name = name
+
+    def __enter__(self):
+        self.start = time.time()
+
+    def __exit__(self, ty, val, tb):
+        end = time.time()
+        print >> sys.stderr, "timer: " + self.name + ": %0.3f seconds" % (end 
- self.start)
+        sys.stdout.flush()
+        return False
+
+
+def contourMap(var, variable, coordinates, n, plotFile):
+    '''Make contour plot for the var array using coordinates vector for 
lat/lon coordinates.'''
+    # TODO: Downscale variable array (SST) before contouring, matplotlib is 
TOO slow on large arrays
+    # Fixed color scale, write file, turn off auto borders, set title, etc.
+    lats = coordinates[0][:]
+    lons = coordinates[1][:]
+    if lats[1] < lats[0]:  # if latitudes decreasing, reverse
+        lats = N.flipud(lats)
+        var = N.flipud(var[:])
+
+    imageMap(lons, lats, var,
+             vmin=-2., vmax=45., outFile=plotFile, autoBorders=False,
+             title='%s %d-day Mean from %s' % (variable.upper(), n, 
os.path.splitext(plotFile)[0]))
+    print >> sys.stderr, 'Writing contour plot to %s' % plotFile
+    return plotFile
+
+
+def histogram(vals, variable, n, outFile):
+    figFile = os.path.splitext(outFile)[0] + '_hist.png'
+    M.clf()
+    #    M.hist(vals, 47, (-2., 45.))
+    M.hist(vals, 94)
+    M.xlim(-5, 45)
+    M.xlabel('SST in degrees Celsius')
+    M.ylim(0, 300000)
+    M.ylabel('Count')
+    M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, 
outFile))
+    M.show()
+    print >> sys.stderr, 'Writing histogram plot to %s' % figFile
+    M.savefig(figFile)
+    return figFile
+
+
+def computeClimatology(datasetName, nEpochs, nWindow, averager, outHdfsPath, 
sparkConfig):
+    '''Compute an N-day climatology for the specified dataset and write the 
files to HDFS.'''
+    if averager not in AveragingFunctions:
+        print >> sys.stderr, 'computeClimatology: Error, averager %s must be 
in set %s' % (
+            averager, str(AveragingFunctions.keys()))
+        sys.exit(1)
+    try:
+        ds = DatasetList[datasetName]  # get dataset metadata class
+    except:
+        print >> sys.stderr, 'computeClimatology: Error, %s not in dataset 
list %s.' % (datasetName, str(DatasetList))
+        sys.exit(1)
+    urls = glob.glob(ds.UrlsPath)
+    with Timer("climByAveragingPeriods"):
+        results = climByAveragingPeriods(urls, ds, nEpochs, nWindow, 
ds.Variable, ds.Mask, ds.Coordinates,
+                                         getattr(ds, "readAndMask", 
readAndMask), outHdfsPath, ds.split,
+                                         averager=averager, 
averagingConfig=AveragingConfigs[averager],
+                                         sparkConfig=sparkConfig)
+    return results
+
+
+def main(args):
+    dsName = args[0]
+    nEpochs = int(args[1])
+    nWindow = int(args[2])
+    averager = args[3]
+    sparkConfig = args[4]
+    outHdfsPath = args[5]
+
+    results = computeClimatology(dsName, nEpochs, nWindow, averager, 
outHdfsPath, sparkConfig)
+
+    if isinstance(results[0], tuple):
+        makeMovie([r[1] for r in results], 'clim.mpg')
+    return results
+
+
+if __name__ == '__main__':
+    print main(sys.argv[1:])
+
+
+# Debug Tests:
+# python ClimatologySpark2.py ModisSst    5 5 pixelMean   multicore,1,1   
cache/clim
+# python ClimatologySpark2.py ModisChlor  5 5 pixelMean   multicore,1,1   
cache/clim
+# python ClimatologySpark2.py MeasuresSsh 5 5 pixelMean   multicore,1,1   
cache/clim
+# python ClimatologySpark2.py ModisSst    5 5 pixelMean   multicore,2,2  
cache/clim
+
+# Production:
+# time python ClimatologySpark2.py ModisSst    5 5 pixelMean mesos,37,37 
cache/clim
+# time python ClimatologySpark2.py ModisChlor  5 5 pixelMean mesos,37,37 
cache/clim
+# time python ClimatologySpark2.py MeasuresSsh 5 5 pixelMean mesos,37,37 
cache/clim
+
+# time python ClimatologySpark2.py ModisSst    5 5 pixelMean dask,37  
cache/clim
+# time python ClimatologySpark2.py ModisChlor  5 5 pixelMean dask,37  
cache/clim
+# time python ClimatologySpark2.py MeasuresSsh 5 5 pixelMean dask,37  
cache/clim

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/README.md
----------------------------------------------------------------------
diff --git a/climatology/clim/README.md b/climatology/clim/README.md
new file mode 100644
index 0000000..f99ae80
--- /dev/null
+++ b/climatology/clim/README.md
@@ -0,0 +1,32 @@
+# climatology-gaussianInterp module
+
+Compute N-day climatology from daily ocean data files.
+
+See bottom of climatology2.py for examples of what to run.
+
+For speedup, the gaussian weighting is also implemented in Fortran
+and cython.  The pure python version is abysmally slow, of course,
+and even the Fortran takes a while (evaluating too many exponentials).
+
+To build gaussInter_f.so from gaussInterp_f.f, run the line in
+gaussInterp_f.mk.
+
+
+
+3 Climatology’s to generate. Daily data is already downloaded to server-1. 
Location is (a) shown below
+ 
+1.      MODIS CHL_A
+a.      server-1:/data/share/datasets/MODIS_L3m_DAY_CHL_chlor_a_4km/daily_data
+b.      https://oceandata.sci.gsfc.nasa.gov/MODIS-Aqua/Mapped/Daily/4km/chlor_a
+
+2.      Measures SSH
+a.      server-1:/data/share/datasets/MEASURES_SLA_JPL_1603/daily_data
+b.      
http://podaac.jpl.nasa.gov/dataset/SEA_SURFACE_HEIGHT_ALT_GRIDS_L4_2SATS_5DAY_6THDEG_V_JPL1609
+c.       ftp://podaac-ftp.jpl.nasa.gov/allData/merged_alt/L4/cdr_grid/
+
+3.      CCMP Wind
+a.      server-1:/data/share/datasets/CCMP_V2.0_L3.0/daily_data/
+b.      
https://podaac.jpl.nasa.gov/dataset/CCMP_MEASURES_ATLAS_L4_OW_L3_0_WIND_VECTORS_FLK
+ 
+
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/__init__.py
----------------------------------------------------------------------
diff --git a/climatology/clim/__init__.py b/climatology/clim/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/binsum.f
----------------------------------------------------------------------
diff --git a/climatology/clim/binsum.f b/climatology/clim/binsum.f
new file mode 100644
index 0000000..96b65af
--- /dev/null
+++ b/climatology/clim/binsum.f
@@ -0,0 +1,64 @@
+c $Revision: 2.4 $
+c     subprogram binsum
+
+c-- calculate interpolation weights and vsum
+
+      subroutine binsum( nf, x, y, z, f, 
+     &                  xx, yy, zz, vsum,
+     &                  ixmin, ixmax, iymin,
+     &                  iymax, izmin, izmax, 
+     &                  imax, jmax, kmax, ndatamax )
+
+      real*4 x(ndatamax),y(ndatamax),z(ndatamax),f(ndatamax)
+      real*4 xx(imax),yy(jmax),zz(kmax)
+      real*4 vsum(2,imax,jmax,kmax)
+      integer*4 ixmin(ndatamax),ixmax(ndatamax)
+      integer*4 iymin(ndatamax),iymax(ndatamax)
+      integer*4 izmin(ndatamax),izmax(ndatamax)
+
+      common /parms/ imin,jmin,kmin,
+     &               xwin2,ywin2,zwin2,
+     &               xh,yh,zh,
+     &               dx,dy,dz,
+     &               xlo,xhi,ylo,yhi,zlo,
+     &               dxd,dyd
+
+      data xfac/-0.6931/
+
+      do n=1,nf
+        ixmin(n)=max(nint((x(n)-xwin2-xlo+0.5*dx)/dx),1)
+        ixmax(n)=min(nint((x(n)+xwin2-xlo+0.5*dx)/dx),imax)
+        iymin(n)=max(nint((y(n)-ywin2-ylo+0.5*dy)/dy),1)
+        iymax(n)=min(nint((y(n)+ywin2-ylo+0.5*dy)/dy),jmax)
+        izmin(n)=max(nint((z(n)-zwin2-zlo+0.5*dz)/dz),1)
+        izmax(n)=min(nint((z(n)+zwin2-zlo+0.5*dz)/dz),kmax)
+c        print *, x(n),y(n),z(n), f(n)
+c      print *,' ixmin, ixmax', ixmin(n), ixmax(n)
+c      print *,' iymin, iymax', iymin(n), iymax(n)
+c      print *,' izmin, izmax', izmin(n), izmax(n)
+      enddo
+
+
+
+      do n=1,nf 
+        do kk=izmin(n),izmax(n)
+          do jj=iymin(n),iymax(n)
+            do ii=ixmin(n),ixmax(n)
+             
+c- - this is the algorithm coded for weights
+
+                fac=exp( xfac*(((x(n)-xx(ii))/xh)**2
+     &                       + ((y(n)-yy(jj))/yh)**2
+     &                      + ((z(n)-zz(kk))/zh)**2) )
+
+c            print *, 'x, xx,  y, yy,  z, zz, fac f',
+c     &        x(n), xx(ii),  y(n), yy(jj), z(n), zz(kk), fac, f(n)
+
+                vsum(1,ii,jj,kk)=vsum(1,ii,jj,kk)+f(n)*fac
+                vsum(2,ii,jj,kk)=vsum(2,ii,jj,kk)+fac
+            enddo
+          enddo
+        enddo
+      enddo
+      return
+      end

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/cache.py
----------------------------------------------------------------------
diff --git a/climatology/clim/cache.py b/climatology/clim/cache.py
new file mode 100644
index 0000000..f5aa1f2
--- /dev/null
+++ b/climatology/clim/cache.py
@@ -0,0 +1,73 @@
+"""
+cache.py
+
+Utilities to retrieve files and cache them at deterministic paths.
+
+"""
+
+import sys, os, urlparse, urllib
+
+# Directory to cache retrieved files in
+CachePath = '/tmp/cache'
+
+
+def isLocalFile(url):
+    '''Check if URL is a local path.'''
+    u = urlparse.urlparse(url)
+    if u.scheme == '' or u.scheme == 'file':
+        if not os.path.exists(u.path):
+            print >>sys.stderr, 'isLocalFile: File at local path does not 
exist: %s' % u.path
+        return (True, u.path)
+    else:
+        return (False, u.path)
+
+
+def retrieveFile(url, cachePath=CachePath, hdfsPath=None, retries=3):
+    '''Retrieve a file from a URL or from an HDFS path.'''
+    if hdfsPath:
+        hdfsFile = os.path.join(hdfsPath, url)
+        return hdfsCopyToLocal(hdfsFile, cachePath)
+    else:
+        return retrieveFileWeb(url, cachePath, retries)
+
+
+def retrieveFileWeb(url, cachePath=CachePath, retries=3):
+    '''Retrieve a file from a URL, or if it is a local path then verify it 
exists.'''
+    if cachePath is None: cachePath = './'
+    ok, path = isLocalFile(url)
+    if ok: return path
+
+    fn = os.path.split(path)[1]
+    outPath = os.path.join(cachePath, fn)
+    if os.path.exists(outPath):
+        print >>sys.stderr, 'retrieveFile: Using cached file: %s' % outPath
+        return outPath
+    else:
+        print >>sys.stderr, 'retrieveFile: Retrieving (URL) %s to %s' % (url, 
outPath)
+        for i in xrange(retries):
+            try:
+                urllib.urlretrieve(url, outPath)
+                return outPath
+            except:
+                print >>sys.stderr, 'retrieveFile: Error retrieving file at 
URL: %s' % url
+                print >>sys.stderr, 'retrieveFile: Retrying ...'
+        print >>sys.stderr, 'retrieveFile: Fatal error, Cannot retrieve file 
at URL: %s' % url
+        return None
+
+
+def hdfsCopyFromLocal(src, dest):
+    '''Copy local file into HDFS directory, overwriting using force switch.'''
+    outPath = os.path.join(dest, os.path.split(src)[1])
+    cmd = "hadoop fs -copyFromLocal -f %s %s" % (src, dest)
+    print >>sys.stderr, "Exec overwrite: %s" % cmd
+    os.system(cmd)
+    return outPath
+
+def hdfsCopyToLocal(src, dest):
+    '''Copy HDFS file to local path, overwriting.'''
+    outPath = os.path.join(dest, os.path.split(src)[1])
+    os.unlink(outPath)
+    cmd = "hadoop fs -copyToLocal %s %s" % (src, dest)
+    print >>sys.stderr, "Exec overwrite: %s" % cmd
+    os.system(cmd)
+    return outPath

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology.py
----------------------------------------------------------------------
diff --git a/climatology/clim/climatology.py b/climatology/clim/climatology.py
new file mode 100755
index 0000000..940a8d0
--- /dev/null
+++ b/climatology/clim/climatology.py
@@ -0,0 +1,233 @@
+"""
+climatology.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on 
single machine.
+
+"""
+
+import sys, os, calendar, urlparse, urllib
+from datetime import datetime
+import numpy as N
+from variables import getVariables, close
+#from timePartitions import partitionFilesByKey
+from split import fixedSplit
+#from stats import Stats
+from pathos.multiprocessing import ProcessingPool as Pool
+from plotlib import imageMap, makeMovie
+
+#from gaussInterp import gaussInterp
+
+VERBOSE = 1
+
+# Possible execution modes
+# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark 
cluster.
+ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark']
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lat/lon is 4320 x 8640
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lat', 'lon']
+
+# Generate algorithmic name for N-day Climatology product
+SSTClimatologyTemplate = 
'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc'  #??
+
+# Simple mask and average functions to get us started, then add gaussian 
interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for 
better coverage
+def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask))
+#def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+
+def average(a): return N.ma.mean(a, axis=0)
+#AveragingFunctions = {'pixelAverage': average, 'gaussInterp': gaussInterp}
+AveragingFunctions = {'pixelAverage': average, 'gaussInterp': average}
+
+
+def climByAveragingPeriods(urls,              # list of (daily) granule URLs 
for a long time period (e.g. a year)
+                    nEpochs,                  # compute a climatology for 
every N epochs (days) by 'averaging'
+                    nWindow,                  # number of epochs in window 
needed for averaging
+                    variable,                 # name of primary variable in 
file
+                    mask,                     # name of mask variable
+                    coordinates,              # names of coordinate arrays to 
read and pass on (e.g. 'lat' and 'lon')
+                    maskFn=qcMask,            # mask function to compute mask 
from mask variable
+                    averager='pixelAverage',  # averaging function to use, one 
of ['pixelAverage', 'gaussInterp']
+                    mode='sequential',        # Map across time periods of 
N-days for concurrent work, executed by:
+                                              # 'sequential' map, 'multicore' 
using pool.map(), 'cluster' using pathos pool.map(),
+                                              # or 'spark' using PySpark
+                    numNodes=1,               # number of cluster nodes to use
+                    nWorkers=4,               # number of parallel workers per 
node
+                    averagingFunctions=AveragingFunctions,    # dict of 
possible averaging functions
+                    legalModes=ExecutionModes  # list of possiblel execution 
modes
+                   ):
+    '''Compute a climatology every N days by applying a mask and averaging 
function.
+Writes the averaged variable grid, attributes of the primary variable, and the 
coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    try:
+        averageFn = averagingFunctions[averager]
+    except :
+        averageFn = average
+        print >>sys.stderr, 'climatology: Error, Averaging function must be 
one of: %s' % str(averagingFunctions)
+
+    urlSplits = [s for s in fixedSplit(urls, nEpochs)]
+    if VERBOSE: print >>sys.stderr, urlSplits
+
+    def climsContoured(urls):
+        n = len(urls)
+        var = climByAveraging(urls, variable, mask, coordinates, maskFn, 
averageFn)
+        return contourMap(var, variable, coordinates, n, urls[0])
+
+    if mode == 'sequential':
+        plots = map(climsContoured, urlSplits)
+    elif mode == 'multicore':
+        pool = Pool(nWorkers)
+        plots = pool.map(climsContoured, urlSplits)        
+    elif mode == 'cluster':
+        pass
+    elif mode == 'spark':
+        pass
+
+    plots = map(climsContoured, urlSplits)
+    print plots
+    return plots
+#    return makeMovie(plots, 'clim.mpg')    
+
+
+def climByAveraging(urls,                 # list of granule URLs for a time 
period
+                    variable,             # name of primary variable in file
+                    mask,                 # name of mask variable
+                    coordinates,          # names of coordinate arrays to read 
and pass on (e.g. 'lat' and 'lon')
+                    maskFn=qcMask,        # mask function to compute mask from 
mask variable
+                    averageFn=average     # averaging function to use
+                   ):
+    '''Compute a climatology over N arrays by applying a mask and averaging 
function.
+Returns the averaged variable grid, attributes of the primary variable, and 
the coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    n = len(urls)
+    varList = [variable, mask]
+    for i, url in enumerate(urls):
+        fn = retrieveFile(url, '~/cache')
+        if VERBOSE: print >>sys.stderr, 'Read variables and mask ...'
+        var, fh = getVariables(fn, varList)            # return dict of 
variable objects by name
+        if i == 0:
+            dtype = var[variable].dtype
+            shape = (n,) + var[variable].shape
+            accum = N.ma.empty(shape, dtype)
+        
+        v = maskFn(var[variable], var[mask])           # apply quality mask 
variable to get numpy MA
+#        v = var[variable][:]
+        accum[i] = v                                   # accumulate N arrays 
for 'averaging'
+        if i+1 != len(urls):                           # keep var dictionary 
from last file to grab metadata
+            close(fh)                                  # REMEMBER:  closing fh 
loses in-memory data structures
+
+    if VERBOSE: print >>sys.stderr, 'Averaging ...'
+    coord, fh = getVariables(fn, coordinates)          # read coordinate 
arrays and add to dict
+    for c in coordinates: var[c] = coord[c][:]
+
+    if averageFn == average:
+        avg = averageFn(accum)                         # call averaging 
function
+    else:
+        var[variable] = accum
+        if averageFn == gaussInterp:
+            varNames = variable + coordinates
+            avg, vweight, status = \
+                gaussInterp(var, varNames, latGrid, lonGrid, wlat, wlon, slat, 
slon, stime, vfactor, missingValue)
+        
+    var['attributes'] = var[variable].__dict__         # save attributes of 
primary variable
+    var[variable] = avg                                # return primary 
variable & mask arrays in dict
+    var[mask] = N.ma.getmask(avg)
+
+#    close(fh)                                         # Can't close, lose 
netCDF4.Variable objects, leaking two fh
+    return var
+
+
+def contourMap(var, variable, coordinates, n, url):
+    p = urlparse.urlparse(url)
+    filename = os.path.split(p.path)[1]
+    return filename
+    outFile = filename + '.png'
+    # Downscale variable array (SST) before contouring, matplotlib is TOO slow 
on large arrays
+    vals = var[variable][:]
+
+    # Fixed color scale, write file, turn off auto borders, set title, reverse 
lat direction so monotonically increasing??
+    imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
+             vmin=-2., vmax=45., outFile=outFile, autoBorders=False,
+             title='%s %d-day Mean from %s' % (variable.upper(), n, filename))
+    print >>sys.stderr, 'Writing contour plot to %s' % outFile
+    return outFile
+
+
+def isLocalFile(url):
+    '''Check if URL is a local path.'''
+    u = urlparse.urlparse(url)
+    if u.scheme == '' or u.scheme == 'file':
+        if not path.exists(u.path):
+            print >>sys.stderr, 'isLocalFile: File at local path does not 
exist: %s' % u.path
+        return (True, u.path)
+    else:
+        return (False, u.path)
+
+
+def retrieveFile(url, dir=None):
+    '''Retrieve a file from a URL, or if it is a local path then verify it 
exists.'''
+    if dir is None: dir = './'
+    ok, path = isLocalFile(url)
+    fn = os.path.split(path)[1]
+    outPath = os.path.join(dir, fn)
+    if not ok:
+        if os.path.exists(outPath):
+            print >>sys.stderr, 'retrieveFile: Using cached file: %s' % outPath
+        else:
+            try:
+                print >>sys.stderr, 'retrieveFile: Retrieving (URL) %s to %s' 
% (url, outPath)
+                urllib.urlretrieve(url, outPath)
+            except:
+                print >>sys.stderr, 'retrieveFile: Cannot retrieve file at 
URL: %s' % url
+                return None
+    return outPath    
+
+
+def dailyFile2date(path, offset=1):
+    '''Convert YYYYDOY string in filename to date.'''
+    fn = os.path.split(path)[1]
+    year = int(fn[offset:offset+4])
+    doy = int(fn[offset+5:offset+8])
+    return fn[5:15].replace('.', '/')
+
+
+def formatRegion(r):
+    """Format lat/lon region specifier as string suitable for file name."""
+    if isinstance(r, str):
+        return r
+    else:
+        strs = [str(i).replace('-', 'm') for i in r]
+        return 'region-%s-%sby%s-%s' % tuple(strs)
+
+
+def formatGrid(r):
+    """Format lat/lon grid resolution specifier as string suitable for file 
name."""
+    if isinstance(r, str):
+        return r
+    else:
+        return str(r[0]) + 'by' + str(r[1])
+
+
+def main(args):
+    nEpochs = int(args[0])
+    nWindow = int(args[1])
+    averager = args[2]
+    mode = args[3]
+    nWorkers = int(args[4])
+    urlFile = args[5]
+    urls = [s.strip() for s in open(urlFile, 'r')]
+    return climByAveragingPeriods(urls, nEpochs, nWindow, 'sst', 'qual_sst', 
['lat', 'lon'],
+                                  averager=averager, mode=mode, 
nWorkers=nWorkers)
+
+    
+if __name__ == '__main__':
+    print main(sys.argv[1:])
+
+
+# python climatology.py 5 5 pixelAverage sequential 1 urls_sst_10days.txt
+# python climatology.py 5 5 gaussianInterp multicore 8 urls_sst_40days.txt
+

Reply via email to