http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/nexuscli/__init__.py ---------------------------------------------------------------------- diff --git a/client/nexuscli/__init__.py b/client/nexuscli/__init__.py new file mode 100644 index 0000000..1275c23 --- /dev/null +++ b/client/nexuscli/__init__.py @@ -0,0 +1,9 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +from nexuscli.nexuscli import TimeSeries +from nexuscli.nexuscli import set_target +from nexuscli.nexuscli import time_series +from nexuscli.nexuscli import dataset_list +from nexuscli.nexuscli import daily_difference_average
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/nexuscli/nexuscli.py ---------------------------------------------------------------------- diff --git a/client/nexuscli/nexuscli.py b/client/nexuscli/nexuscli.py new file mode 100644 index 0000000..2643713 --- /dev/null +++ b/client/nexuscli/nexuscli.py @@ -0,0 +1,198 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved + +This module provides a native python client interface to the NEXUS (https://github.com/dataplumber/nexus) +webservice API. + +Usage: + + import nexuscli + + nexuscli.set_target("http://nexus-webapp:8083") + nexuscli.dataset_list() + +""" +import requests +import numpy as np +from datetime import datetime +from collections import namedtuple, OrderedDict +from pytz import UTC + +__pdoc__ = {} +TimeSeries = namedtuple('TimeSeries', ('dataset', 'time', 'mean', 'standard_deviation', 'count', 'minimum', 'maximum')) +TimeSeries.__doc__ = '''\ +An object containing Time Series arrays. +''' +__pdoc__['TimeSeries.dataset'] = "Name of the Dataset" +__pdoc__['TimeSeries.time'] = "`numpy` array containing times as `datetime` objects" +__pdoc__['TimeSeries.mean'] = "`numpy` array containing means" +__pdoc__['TimeSeries.standard_deviation'] = "`numpy` array containing standard deviations" +__pdoc__['TimeSeries.count'] = "`numpy` array containing counts" +__pdoc__['TimeSeries.minimum'] = "`numpy` array containing minimums" +__pdoc__['TimeSeries.maximum'] = "`numpy` array containing maximums" + +ISO_FORMAT = "%Y-%m-%dT%H:%M:%SZ" + +target = 'http://localhost:8083' + +session = requests.session() + + +def set_target(url): + """ + Set the URL for the NEXUS webapp endpoint. + + __url__ URL for NEXUS webapp endpoint + __return__ None + """ + global target + target = url + + +def dataset_list(): + """ + Get a list of datasets and the start and end time for each. + + __return__ list of datasets. Each entry in the list contains `shortname`, `start`, and `end` + """ + response = session.get("{}/list".format(target)) + data = response.json() + + list_response = [] + for dataset in data: + dataset['start'] = datetime.utcfromtimestamp(dataset['start'] / 1000).strftime(ISO_FORMAT) + dataset['end'] = datetime.utcfromtimestamp(dataset['end'] / 1000).strftime(ISO_FORMAT) + + ordered_dict = OrderedDict() + ordered_dict['shortName'] = dataset['shortName'] + ordered_dict['start'] = dataset['start'] + ordered_dict['end'] = dataset['end'] + list_response.append(ordered_dict) + + return list_response + + +def daily_difference_average(dataset, bounding_box, start_datetime, end_datetime): + """ + Generate an anomaly Time series for a given dataset, bounding box, and timeframe. + + __dataset__ Name of the dataset as a String + __bounding_box__ Bounding box for area of interest as a `shapely.geometry.polygon.Polygon` + __start_datetime__ Start time as a `datetime.datetime` + __end_datetime__ End time as a `datetime.datetime` + + __return__ List of `nexuscli.nexuscli.TimeSeries` namedtuples + """ + url = "{}/dailydifferenceaverage_spark?".format(target) + + params = { + 'dataset': dataset, + 'climatology': "{}_CLIM".format(dataset), + 'b': ','.join(str(b) for b in bounding_box.bounds), + 'startTime': start_datetime.strftime(ISO_FORMAT), + 'endTime': end_datetime.strftime(ISO_FORMAT), + } + + response = session.get(url, params=params) + response.raise_for_status() + response = response.json() + + data = np.array(response['data']).flatten() + + assert len(data) > 0, "No data found in {} between {} and {} for Datasets {}.".format(bounding_box.wkt, + start_datetime.strftime( + ISO_FORMAT), + end_datetime.strftime( + ISO_FORMAT), + dataset) + + time_series_result = [] + + key_to_index = {k: x for x, k in enumerate(data[0].keys())} + + time_series_data = np.array([tuple(each.values()) for each in [entry for entry in data]]) + + if len(time_series_data) > 0: + time_series_result.append( + TimeSeries( + dataset=dataset, + time=np.array([datetime.utcfromtimestamp(t).replace(tzinfo=UTC) for t in + time_series_data[:, key_to_index['time']]]), + mean=time_series_data[:, key_to_index['mean']], + standard_deviation=time_series_data[:, key_to_index['std']], + count=None, + minimum=None, + maximum=None, + ) + ) + + return time_series_result + + +def time_series(datasets, bounding_box, start_datetime, end_datetime, spark=False): + """ + Send a request to NEXUS to calculate a time series. + + __datasets__ Sequence (max length 2) of the name of the dataset(s) + __bounding_box__ Bounding box for area of interest as a `shapely.geometry.polygon.Polygon` + __start_datetime__ Start time as a `datetime.datetime` + __end_datetime__ End time as a `datetime.datetime` + __spark__ Optionally use spark. Default: `False` + + __return__ List of `nexuscli.nexuscli.TimeSeries` namedtuples + """ + + if isinstance(datasets, str): + datasets = [datasets] + + assert 0 < len(datasets) <= 2, "datasets must be a sequence of 1 or 2 items" + + params = { + 'ds': ','.join(datasets), + 'b': ','.join(str(b) for b in bounding_box.bounds), + 'startTime': start_datetime.strftime(ISO_FORMAT), + 'endTime': end_datetime.strftime(ISO_FORMAT), + } + + if spark: + url = "{}/timeSeriesSpark?".format(target) + params['spark'] = "mesos,16,32" + else: + url = "{}/stats?".format(target) + + response = session.get(url, params=params) + response.raise_for_status() + response = response.json() + + data = np.array(response['data']).flatten() + + assert len(data) > 0, "No data found in {} between {} and {} for Datasets {}.".format(bounding_box.wkt, + start_datetime.strftime( + ISO_FORMAT), + end_datetime.strftime( + ISO_FORMAT), + datasets) + + time_series_result = [] + + for i in range(0, len(response['meta'])): + key_to_index = {k: x for x, k in enumerate(data[0].keys())} + + time_series_data = np.array([tuple(each.values()) for each in [entry for entry in data if entry['ds'] == i]]) + + if len(time_series_data) > 0: + time_series_result.append( + TimeSeries( + dataset=response['meta'][i]['shortName'], + time=np.array([datetime.utcfromtimestamp(t).replace(tzinfo=UTC) for t in + time_series_data[:, key_to_index['time']]]), + mean=time_series_data[:, key_to_index['mean']], + standard_deviation=time_series_data[:, key_to_index['std']], + count=time_series_data[:, key_to_index['cnt']], + minimum=time_series_data[:, key_to_index['min']], + maximum=time_series_data[:, key_to_index['max']], + ) + ) + + return time_series_result http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/nexuscli/test/__init__.py ---------------------------------------------------------------------- diff --git a/client/nexuscli/test/__init__.py b/client/nexuscli/test/__init__.py new file mode 100644 index 0000000..82079a0 --- /dev/null +++ b/client/nexuscli/test/__init__.py @@ -0,0 +1,4 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/nexuscli/test/nexuscli_test.py ---------------------------------------------------------------------- diff --git a/client/nexuscli/test/nexuscli_test.py b/client/nexuscli/test/nexuscli_test.py new file mode 100644 index 0000000..0beea95 --- /dev/null +++ b/client/nexuscli/test/nexuscli_test.py @@ -0,0 +1,30 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import unittest +from datetime import datetime + +from shapely.geometry import box + +import nexuscli + + +class TestCli(unittest.TestCase): + def test_time_series(self): + ts = nexuscli.time_series(("AVHRR_OI_L4_GHRSST_NCEI", "MEASURES_SLA_JPL_1603"), box(-150, 45, -120, 60), + datetime(2016, 1, 1), datetime(2016, 12, 31)) + + self.assertEqual(2, len(ts)) + + def test_list(self): + ds_list = nexuscli.dataset_list() + + print(ds_list) + self.assertTrue(len(ds_list) > 0) + + def test_daily_difference_average(self): + ts = nexuscli.daily_difference_average("AVHRR_OI_L4_GHRSST_NCEI", box(-150, 45, -120, 60), + datetime(2013, 1, 1), datetime(2014, 12, 31)) + + self.assertEqual(1, len(ts)) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/requirements.txt ---------------------------------------------------------------------- diff --git a/client/requirements.txt b/client/requirements.txt new file mode 100644 index 0000000..a892ee6 --- /dev/null +++ b/client/requirements.txt @@ -0,0 +1,14 @@ +certifi==2017.4.17 +chardet==3.0.4 +idna==2.5 +Mako==1.0.7 +Markdown==2.4.1 +MarkupSafe==1.0 +nexuscli==1.0 +numpy==1.13.1 +pdoc==0.3.2 +Pygments==2.2.0 +pytz==2017.2 +requests==2.18.1 +Shapely==1.5.17.post1 +urllib3==1.21.1 http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/client/setup.py ---------------------------------------------------------------------- diff --git a/client/setup.py b/client/setup.py new file mode 100644 index 0000000..dc8ba23 --- /dev/null +++ b/client/setup.py @@ -0,0 +1,36 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" + +from setuptools import setup, find_packages + +__version__ = '1.0' + +setup( + name="nexuscli", + version=__version__, + packages=find_packages(), + url="https://github.jpl.nasa.gov/thuang/nexus", + + author="Team Nexus", + + description="NEXUS Client Module", + long_description=open('README.md').read(), + + platforms='any', + + install_requires=[ + 'requests', + 'shapely', + 'numpy', + 'pytz' + ], + + classifiers=[ + 'Development Status :: 1 - Pre-Alpha', + 'Intended Audience :: Developers', + 'Operating System :: OS Independent', + 'Programming Language :: Python :: 2.7', + ] +) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/.gitignore ---------------------------------------------------------------------- diff --git a/climatology/.gitignore b/climatology/.gitignore new file mode 100644 index 0000000..8ff7b03 --- /dev/null +++ b/climatology/.gitignore @@ -0,0 +1,11 @@ +.DS_Store + +dist + +build + +*.egg-info + +*_f.so* + +*.nc \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/ClimatologySpark.py ---------------------------------------------------------------------- diff --git a/climatology/clim/ClimatologySpark.py b/climatology/clim/ClimatologySpark.py new file mode 100755 index 0000000..4b01466 --- /dev/null +++ b/climatology/clim/ClimatologySpark.py @@ -0,0 +1,455 @@ +""" +ClimatologySpark.py + +Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids. + +Simple code to be run on Spark cluster, or using multi-core parallelism on single machine. + +""" + +import sys, os, urlparse, urllib, re, time +import numpy as N +import matplotlib +matplotlib.use('Agg') +import matplotlib.pylab as M + +from variables import getVariables, close +from cache import retrieveFile, CachePath +from split import fixedSplit, splitByNDays +from netCDF4 import Dataset, default_fillvals +from pathos.multiprocessing import ProcessingPool as Pool +from plotlib import imageMap, makeMovie + +from spatialFilter import spatialFilter +from gaussInterp import gaussInterp # calls into Fortran version gaussInterp_f.so +#from gaussInterp_slow import gaussInterp_slow as gaussInterp # pure python, slow debuggable version + +#import pyximport; pyximport.install(pyimport=False) +#from gaussInterp import gaussInterp, gaussInterp_ # attempted cython version, had issues + +VERBOSE = 1 + +# Possible execution modes +# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster. +ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark'] + +# SST L3m 4.6km Metadata +# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320 +# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat'] + +# Generate algorithmic name for N-day Climatology product +SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #?? + +# Simple mask and average functions to get us started, then add gaussian interpolation. +# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage +#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0)) + +def qcMask(var, mask): return N.ma.masked_where(mask != 0, var) +#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var) + +def splitModisSst(seq, n): + for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m')): + yield chunk + +def mean(a): return N.ma.mean(a, axis=0) + +AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter} + +PixelMeanConfig = {'name': 'pixelMean'} + +GaussInterpConfig = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 3, 'wlon': 3, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig1a = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 0.30, 'wlon': 0.30, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig1b = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 0.08, 'wlon': 0.08, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig2 = {'name': 'gaussInterp', + 'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25), + 'wlat': 2., 'wlon': 2., + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]] # divide by 16 +FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]] # divide by 9 + +SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16., + 'spatialFilter': N.array(FilterGaussian, dtype=N.int32), + 'missingValue': default_fillvals['f4']} + +SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9., + 'spatialFilter': N.array(FilterLowPass, dtype=N.int32), + 'missingValue': default_fillvals['f4']} + +# Directory to cache retrieved files in +CachePath = '~/cache' + + +def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year) + nEpochs, # compute a climatology for every N epochs (days) by 'averaging' + nWindow, # number of epochs in window needed for averaging + nNeighbors, # number of neighbors on EACH side in lat/lon directions to use in averaging + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + splitFn=splitModisSst, # split function to use to partition the input URL list + maskFn=qcMask, # mask function to compute mask from mask variable + averager='pixelMean', # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter'] + averagingConfig={}, # dict of parameters to control the averaging function (e.g. gaussInterp) + optimization='fortran', # optimization mode (fortran or cython) + mode='sequential', # Map across time periods of N-days for concurrent work, executed by: + # 'sequential' map, 'multicore' using pool.map(), 'cluster' using pathos pool.map(), + # or 'spark' using PySpark + numNodes=1, # number of cluster nodes to use + nWorkers=4, # number of parallel workers per node + averagingFunctions=AveragingFunctions, # dict of possible averaging functions + legalModes=ExecutionModes, # list of possible execution modes + cachePath=CachePath # directory to cache retrieved files in + ): + '''Compute a climatology every N days by applying a mask and averaging function. +Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + if averagingConfig['name'] == 'gaussInterp': + averagingConfig['wlat'] = nNeighbors + averagingConfig['wlon'] = nNeighbors + try: + averageFn = averagingFunctions[averager] + except: + print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions) + sys.exit(1) + + urlSplits = [s for s in splitFn(urls, nEpochs)] + + def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], format='NETCDF4', cachePath=cachePath): + n = len(urls) + if VERBOSE: print >>sys.stderr, urls + + var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn, averagingConfig, optimization, cachePath) + + fn = os.path.split(urls[0])[1] + inFile = os.path.join(cachePath, fn) + method = averagingConfig['name'] + fn = os.path.splitext(fn)[0] + day = fn[5:8] + nDays = int(var['time'][0]) + + if 'wlat' in averagingConfig: + wlat = averagingConfig['wlat'] + else: + wlat = 1 + if int(wlat) == wlat: + outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, method, int(wlat)) # mark each file with first day in period + else: + outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, nDays, method, wlat) # mark each file with first day in period + + outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue, format) + + if plot == 'contour': + figFile = contourMap(var, variable, coordinates, n, outFile) + elif plot == 'histogram': +# figFile = histogram(var, variable, n, outFile) + figFile = None + else: + figFile = None + return (outFile, figFile) + + if mode == 'sequential': + results = map(climsContoured, urlSplits) + elif mode == 'multicore': + pool = Pool(nWorkers) + results = pool.map(climsContoured, urlSplits) + elif mode == 'cluster': + pass + elif mode == 'spark': + pass + + return results + + +def climByAveraging(urls, # list of granule URLs for a time period + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + maskFn=qcMask, # mask function to compute mask from mask variable + averageFn=mean, # averaging function to use + averagingConfig={}, # parameters to control averaging function (e.g. gaussInterp) + optimization='fortran', # optimization mode (fortran or cython) + cachePath=CachePath + ): + '''Compute a climatology over N arrays by applying a mask and averaging function. +Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + n = len(urls) + varList = [variable, mask] + var = {} + vtime = N.zeros((n,), N.int32) + + for i, url in enumerate(urls): + try: + path = retrieveFile(url, cachePath) + fn = os.path.split(path)[1] + vtime[i] = int(fn[5:8]) # KLUDGE: extract DOY from filename + except: + print >>sys.stderr, 'climByAveraging: Error, continuing without file %s' % url + accum[i] = emptyVar + continue + if path is None: continue + print >>sys.stderr, 'Read variables and mask ...' + try: + var, fh = getVariables(path, varList, arrayOnly=True, order='F', set_auto_mask=False) # return dict of variable objects by name + except: + print >>sys.stderr, 'climByAveraging: Error, cannot read file %s' % path + accum[i] = emptyVar + continue + if i == 0: + dtype = var[variable].dtype + if 'int' in dtype.name: dtype = N.float32 + shape = (n,) + var[variable].shape + accum = N.ma.empty(shape, dtype, order='F') + emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), order='F') # totally masked variable array for missing or bad file reads + + print >>sys.stderr, 'Read coordinates ...' + var, fh = getVariables(path, coordinates, var, arrayOnly=True, order='F') # read coordinate arrays and add to dict + + var[variable] = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA, turned off masking done by netCDF4 library +# var[variable] = var[variable][:] + + # Echo variable range for sanity check + vals = var[variable].compressed() + print >>sys.stderr, 'Variable Range: min, max:', vals.min(), vals.max() + + # Plot input grid +# figFile = histogram(vals, variable, n, os.path.split(path)[1]) +# figFile = contourMap(var, variable, coordinates[1:], n, os.path.split(path)[1]) + + accum[i] = var[variable] # accumulate N arrays for 'averaging""" +# if i != 0 and i+1 != n: close(fh) # REMEMBER: closing fh loses netCDF4 in-memory data structures + close(fh) + + coordinates = ['time'] + coordinates # add constructed time (days) as first coordinate + var['time'] = vtime + + if averagingConfig['name'] == 'pixelMean': + print >>sys.stderr, 'Doing Pixel Average over %d grids ...' % n + start = time.time() + avg = averageFn(accum) # call pixel averaging function + end = time.time() + print >>sys.stderr, 'pixelMean execution time:', (end - start) + outlat = var[coordinates[1]].astype(N.float32)[:] + outlon = var[coordinates[2]].astype(N.float32)[:] + elif averagingConfig['name'] == 'gaussInterp': + print >>sys.stderr, 'Doing Gaussian Interpolation over %d grids ...' % n + var[variable] = accum + c = averagingConfig + latGrid = c['latGrid']; lonGrid = c['lonGrid'] + if latGrid is not None and lonGrid is not None: + outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], dtype=N.float32, order='F') + outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], dtype=N.float32, order='F') + else: + outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F') + outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F') + varNames = [variable] + coordinates + start = time.time() + avg, weight, status = \ + gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'], + c['slat'], c['slon'], c['stime'], c['vfactor'], c['missingValue'], + VERBOSE, optimization) + end = time.time() + var['outweight'] = weight.astype(N.float32) + print >>sys.stderr, 'gaussInterp execution time:', (end - start) + elif averagingConfig['name'] == 'spatialFilter': + print >>sys.stderr, 'Applying Spatial 3x3 Filter and then averaging over %d grids ...' % n + var[variable] = accum + c = averagingConfig + varNames = [variable] + coordinates + start = time.time() + avg, count, status = \ + spatialFilter(var, varNames, c['spatialFilter'], c['normalization'], + c['missingValue'], VERBOSE, optimization) + end = time.time() + print >>sys.stderr, 'spatialFilter execution time:', (end - start) + outlat = var[coordinates[1]].astype(N.float32)[:] + outlon = var[coordinates[2]].astype(N.float32)[:] + + var['out'+variable] = avg.astype(N.float32) # return primary variable & mask arrays in dict + var['out'+mask] = N.ma.getmask(avg) + var['outlat'] = outlat + var['outlon'] = outlon + return var + + +def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue=None, format='NETCDF4'): + '''Construct output bundle of arrays with NetCDF dimensions and attributes. +Output variables and attributes will have same names as the input file. + ''' + din = Dataset(inFile, 'r') + dout = Dataset(outFile, 'w', format=format) + print >>sys.stderr, 'Writing %s ...' % outFile + + # Transfer global attributes from input file + for a in din.ncattrs(): + dout.setncattr(a, din.getncattr(a)) + + # Add dimensions and variables, copying data + coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for coord in coordinates] # here output lon, lat, etc. + for coord in coordinates: + v = dout.createVariable(coord, var['out'+coord].dtype, (coord,)) + v[:] = var['out'+coord][:] + primaryVar = dout.createVariable(variable, var['out'+variable].dtype, coordinates, fill_value=fillValue) + primaryVar[:] = var['out'+variable][:] # transfer array + maskVar = dout.createVariable(mask, 'i1', coordinates) + maskVar[:] = var['out'+mask].astype('i1')[:] + + # Transfer variable attributes from input file + for k,v in dout.variables.iteritems(): + for a in din.variables[k].ncattrs(): + if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue + v.setncattr(a, din.variables[k].getncattr(a)) + if k == variable: + try: +# if fillValue == None: fillValue = din.variables[k].getncattr('_FillValue') # total kludge + if fillValue == None: fillValue = default_fillvals['f4'] +# print >>sys.stderr, default_fillvals +# v.setncattr('_FillValue', fillValue) # set proper _FillValue for climatology array + v.setncattr('missing_value', fillValue) + print >>sys.stderr, 'Setting missing_value for primary variable %s to %f' % (variable, fillValue) + except: + print >>sys.stderr, 'writeOutNetcdfVars: Warning, for variable %s no fill value specified or derivable from inputs.' % variable + din.close() + dout.close() + return outFile + + +def contourMap(var, variable, coordinates, n, outFile): + figFile = os.path.splitext(outFile)[0] + '_hist.png' + # TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays + vals = var[variable][:] + + # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing?? + imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:], + vmin=-2., vmax=45., outFile=figFile, autoBorders=False, + title='%s %d-day Mean from %s' % (variable.upper(), n, outFile)) + print >>sys.stderr, 'Writing contour plot to %s' % figFile + return figFile + + +def histogram(vals, variable, n, outFile): + figFile = os.path.splitext(outFile)[0] + '_hist.png' + M.clf() +# M.hist(vals, 47, (-2., 45.)) + M.hist(vals, 94) + M.xlim(-5, 45) + M.xlabel('SST in degrees Celsius') + M.ylim(0, 300000) + M.ylabel('Count') + M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile)) + M.show() + print >>sys.stderr, 'Writing histogram plot to %s' % figFile + M.savefig(figFile) + return figFile + + +def dailyFile2date(path, offset=1): + '''Convert YYYYDOY string in filename to date.''' + fn = os.path.split(path)[1] + year = int(fn[offset:offset+4]) + doy = int(fn[offset+5:offset+8]) + return fn[5:15].replace('.', '/') + + +def formatRegion(r): + """Format lat/lon region specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + strs = [str(i).replace('-', 'm') for i in r] + return 'region-%s-%sby%s-%s' % tuple(strs) + + +def formatGrid(r): + """Format lat/lon grid resolution specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + return str(r[0]) + 'by' + str(r[1]) + + +def main(args): + nEpochs = int(args[0]) + nWindow = int(args[1]) + nNeighbors = int(args[2]) + averager = args[3] + optimization = args[4] + mode = args[5] + nWorkers = int(args[6]) + urlFile = args[7] + urls = [s.strip() for s in open(urlFile, 'r')] + if averager == 'gaussInterp': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=GaussInterpConfig, + mode=mode, nWorkers=nWorkers) + elif averager == 'spatialFilter': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=SpatialFilterConfig1, + mode=mode, nWorkers=nWorkers) + elif averager == 'pixelMean': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=PixelMeanConfig, + mode=mode, nWorkers=nWorkers) + else: + print >>sys.stderr, 'climatology2: Error, averager must be one of', AveragingFunctions.keys() + sys.exit(1) + + if results[0][1] is not None: + makeMovie([r[1] for r in results], 'clim.mpg') + return results + + +if __name__ == '__main__': + print main(sys.argv[1:]) + + +# Old Tests: +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_10days.txt +# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_10days.txt + +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_40days.txt +# python climatology2.py 5 5 0 pixelMean fortran multicore 8 urls_sst_40days.txt +# python climatology2.py 5 5 3 gaussInterp fortran multicore 8 urls_sst_40days.txt + +# Old Production: +# python climatology2.py 5 5 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log & +# python climatology2.py 10 10 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log & +# python climatology2.py 5 5 3 gaussInterp fortran multicore 16 urls_sst_2015.txt >& log & + + +# Tests: +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 0 pixelMean fortran multicore 4 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 3 gaussInterp fortran multicore 4 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 urls_sst_daynight_5days_sorted.txt + +# Test number of neighbors needed: +# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_20days_sorted.txt + + +# Production: +# python climatology2.py 5 7 0 pixelMean fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 1 spatialFilter fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/ClimatologySpark2.py ---------------------------------------------------------------------- diff --git a/climatology/clim/ClimatologySpark2.py b/climatology/clim/ClimatologySpark2.py new file mode 100755 index 0000000..64b0510 --- /dev/null +++ b/climatology/clim/ClimatologySpark2.py @@ -0,0 +1,636 @@ +""" +ClimatologySpark2.py + +Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids. + +Simple code to be run on Spark cluster, or using multi-core parallelism on single machine via pysparkling. + +""" + +import sys, os, urlparse, urllib, re, time, glob +import numpy as N +import matplotlib + +matplotlib.use('Agg') +import matplotlib.pylab as M + +from clim.variables import getVariables, close +from clim.cache import retrieveFile, CachePath, hdfsCopyFromLocal +from netCDF4 import Dataset, default_fillvals +from clim.plotlib import imageMap, makeMovie + +from clim.spatialFilter import spatialFilter +from clim.gaussInterp import gaussInterp # calls into Fortran version gaussInterp_f.so +# from clim.gaussInterp_slow import gaussInterp_slow as gaussInterp # pure python, slow debuggable version + +from clim.datasets import DatasetList, ModisSst, ModisChlor, MeasuresSsh +from clim.sort import sortByKeys +from clim.split import groupByKeys, splitByNDaysKeyed + +from pyspark import SparkContext, SparkConf + +DaskClientEndpoint = "daskclient:8786" + +VERBOSE = 0 +CollectAndTime = 0 + +# Possible execution modes +# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster. +ExecutionModes = ['spark', 'mesos', 'multicore'] + + +# SST L3m 4.6km Metadata +# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320 +# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat'] + +# Simple mask and average functions to get us started, then add gaussian interpolation. +# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage + +def qcMask(var, mask): return N.ma.masked_where(mask != 0, var) + + +# Example Configurations +PixelMeanConfig = {'name': 'pixelMean', 'accumulators': ['count', 'mean', 'M2']} + +GaussInterpConfig = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 3, 'wlon': 3, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4'], + 'optimization': 'fortran'} + +GaussInterpConfig1a = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 0.30, 'wlon': 0.30, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4'], + 'optimization': 'fortran'} + +GaussInterpConfig1b = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 0.08, 'wlon': 0.08, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4'], + 'optimization': 'fortran'} + +GaussInterpConfig2 = {'name': 'gaussInterp', + 'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25), + 'wlat': 2., 'wlon': 2., + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4'], + 'optimization': 'fortran'} + +FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]] # divide by 16 +FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]] # divide by 9 + +SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16., + 'spatialFilter': N.array(FilterGaussian, dtype=N.int32), + 'missingValue': default_fillvals['f4']} + +SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9., + 'spatialFilter': N.array(FilterLowPass, dtype=N.int32), + 'missingValue': default_fillvals['f4']} + +# Three kinds of averaging supported +AveragingFunctions = {'pixelMean': None, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter} +AveragingConfigs = {'pixelMean': PixelMeanConfig, 'gaussInterp': GaussInterpConfig1a, + 'spatialFilter': SpatialFilterConfig1} + + +def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. 15 years), *SORTED* by DOY + datasetInfo, # class holding dataset metadata + nEpochs, # compute a climatology for every N epochs (days) by 'averaging' + nWindow, # number of epochs in window needed for averaging + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + reader, # reader function that reads the variable from the file into a numpy masked array + outHdfsPath, + # HDFS path to write outputs to (i.e. netCDF file containing stats and optional contour plot) + splitter, # split function to use to partition the input URL list + masker=qcMask, # mask function to compute mask from mask variable + averager='pixelMean', + # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter'] + averagingConfig={}, + # dict of parameters to control the averaging function (e.g. gaussInterp) + sparkConfig='mesos,8,16', + # Map across time periods of N-days for concurrent work, executed by: + # 'spark to use PySpark or 'multicore' using pysparkling + # 'spark,n,m' means use N executors and M partitions + averagingFunctions=AveragingFunctions, # dict of possible averaging functions + legalModes=ExecutionModes, # list of possible execution modes + cachePath=CachePath # directory to cache retrieved files in + ): + '''Compute a climatology every N days by applying a mask and averaging function. +Writes the climatology average, attributes of the primary variable, and the coordinate arrays to a series of netCDF files. +Optionally generates a contour plot of each N-day climatology for verification. + ''' + try: + averageFn = averagingFunctions[averager] + except: + print >> sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions) + sys.exit(1) + if 'accumulators' in averagingConfig: + accumulators = averagingConfig['accumulators'] + + # Split daily files into N-day periods for parallel tasks (e.g. 5-day clim means 73 periods) + # Keyed by integer DOY + urls = sortByKeys(urls, datasetInfo.getKeys) + urlSplits = splitter(urls, nEpochs) + urlSplits = groupByKeys(urlSplits) + print >> sys.stderr, 'Number of URL splits = ', len(urlSplits) + if VERBOSE: print >> sys.stderr, urlSplits + if len(urlSplits) == 0: sys.exit(1) + + # Compute per-pixel statistics in parallel + with Timer("Parallel Stats"): + if sparkConfig.startswith('dask'): + outputFiles = parallelStatsDaskSimple(urlSplits, datasetInfo, nEpochs, variable, mask, coordinates, reader, + outHdfsPath, + averagingConfig, sparkConfig, accumulators) + else: + outputFiles = parallelStatsSparkSimple(urlSplits, datasetInfo, nEpochs, variable, mask, coordinates, reader, + outHdfsPath, + averagingConfig, sparkConfig, accumulators) + return outputFiles + + +def parallelStatsSparkSimple(urlSplits, ds, nEpochs, variable, mask, coordinates, reader, outHdfsPath, averagingConfig, + sparkConfig, + accumulators=['count', 'mean', 'M2', 'min', 'max']): + '''Compute N-day climatology statistics in parallel using PySpark or pysparkling.''' + with Timer("Configure Spark"): + sparkContext, numExecutors, numPartitions = configureSpark(sparkConfig, appName='parallelClimStats') + + urlsRDD = sparkContext.parallelize(urlSplits, numPartitions) # partition N-day periods into tasks + + with CollectTimer("Map-reduce over %d partitions" % numPartitions): # map to update accumulators in parallel + accum = urlsRDD.map(lambda urls: accumulate(urls, variable, mask, coordinates, reader, accumulators)) + + with CollectTimer("Compute merged statistics"): + stats = accum.map(statsFromAccumulators) # compute final statistics from accumulators + + with Timer("Write out climatology and optionally plot"): # write stats to netCDF file + outputFiles = stats.map( + lambda s: writeAndPlot(s, ds, variable, coordinates, nEpochs, averagingConfig, outHdfsPath, plot=False)) \ + .collect() + return outputFiles + + +def parallelStatsSpark(urlSplits, ds, nEpochs, variable, mask, coordinates, reader, outHdfsPath, averagingConfig, + sparkConfig, + accumulators=['count', 'mean', 'M2', 'min', 'max']): + '''Compute N-day climatology statistics in parallel using PySpark or pysparkling.''' + with Timer("Configure Spark"): + sparkContext, numExecutors, numPartitions = configureSpark(sparkConfig, appName='parallelClimStats') + + urlsRDD = sparkContext.parallelize(urlSplits, numPartitions) # partition N-day periods into tasks + + with CollectTimer("Map-reduce over %d partitions" % numPartitions): + merged = urlsRDD.map(lambda urls: accumulate(urls, variable, mask, coordinates, reader, accumulators)) \ + .reduceByKey(combine) # map-reduce to update accumulators in parallel + + with CollectTimer("Compute merged statistics"): + stats = merged.map(statsFromAccumulators) # compute final statistics from merged accumulators + + with Timer("Write out climatology and optionally plot"): + outputFiles = stats.map( + lambda s: writeAndPlot(s, ds, variable, coordinates, nEpochs, averagingConfig, outHdfsPath, plot=False)) \ + .collect() + return outputFiles + + +def parallelStatsPipeline(urls, ds, nEpochs, variable, mask, coordinates, reader, averagingConfig, outHdfsPath, + accumulators=['count', 'mean', 'M2', 'min', 'max']): + '''Hide entire pipeline in a single function.''' + outputFile = writeAndPlot( + statsFromAccumulators(accumulate(urls, variable, mask, coordinates, reader, accumulators)), + ds, variable, coordinates, nEpochs, averagingConfig, outHdfsPath, plot=False) + return outputFile + + +def parallelStatsDaskSimple(urlSplits, ds, nEpochs, variable, mask, coordinates, reader, outHdfsPath, averagingConfig, + sparkConfig, + accumulators=['count', 'mean', 'M2', 'min', 'max']): + '''Compute N-day climatology statistics in parallel using PySpark or pysparkling.''' + if not sparkConfig.startswith('dask,'): + print >> sys.stderr, "dask: configuration must be of form 'dask,n'" + sys.exit(1) + numPartitions = int(sparkConfig.split(',')[1]) + + with Timer("Configure Dask distributed"): + from distributed import Client, as_completed + client = Client(DaskClientEndpoint) + + print >> sys.stderr, 'Starting parallel Stats using Dask . . .' + start = time.time() + futures = client.map( + lambda urls: parallelStatsPipeline(urls, ds, nEpochs, variable, mask, coordinates, reader, averagingConfig, + outHdfsPath, accumulators), urlSplits) + + outputFiles = [] + for future in as_completed(futures): + outputFile = future.result() + outputFiles.append(outputFile) + end = time.time() + print >> sys.stderr, "parallelStats: Completed %s in %0.3f seconds." % (outputFile, (end - start)) + return outputFiles + + +def writeAndPlot(stats, ds, variable, coordinates, nEpochs, averagingConfig, outHdfsPath, plot=False): + '''Write statistics to a netCDF4 file and optionally make a contour plot.''' + key, stats = stats + doy = int(key) + urls = stats['_meta']['urls'] + # outFile = 'A%03d.L3m_%s_%dday_clim_%s.nc' % (doy, variable, nEpochs, averagingConfig['name']) # mark each file with first day in period + outFile = ds.genOutputName(doy, variable, nEpochs, averagingConfig) + firstInputFile = retrieveFile(urls[0]) + + outFile = writeStats(stats, firstInputFile, variable, coordinates, outFile) # write to netCDF4 file + outHdfsFile = hdfsCopyFromLocal(outFile, outHdfsPath) + + if plot: + coords = readCoordinates(firstInputFile, coordinates) + plotFile = contourMap(stats['mean'], variable, coords, nEpochs, outFile + '.png') + plotHdfsFile = hdfsCopyFromLocal(plotFile, outHdfsPath) + return (outHdfsFile, plotHdfsFile) + else: + return outHdfsFile + + +def configureSpark(sparkConfig, appName, memoryPerExecutor='4G', coresPerExecutor=1): + mode, numExecutors, numPartitions = sparkConfig.split(',') + numExecutors = int(numExecutors) + print >> sys.stderr, 'numExecutors = ', numExecutors + numPartitions = int(numPartitions) + print >> sys.stderr, 'numPartitions = ', numPartitions + if mode == 'multicore': + print >> sys.stderr, 'Using pysparkling' + import pysparkling + sc = pysparkling.Context() + else: + print >> sys.stderr, 'Using PySpark' + sparkMaster = mode + spConf = SparkConf() + spConf.setAppName(appName) + spConf.set("spark.executorEnv.HOME", + os.path.join(os.getenv('HOME'), 'spark_exec_home')) + spConf.set("spark.executorEnv.PYTHONPATH", os.getcwd()) + spConf.set("spark.executor.memory", memoryPerExecutor) + print >> sys.stderr, 'memoryPerExecutor = ', memoryPerExecutor + try: + sparkMaster = SparkMasterOverride + except: + pass + if sparkMaster[:5] == "mesos": + spConf.set("spark.cores.max", numExecutors) + else: + # Spark master is YARN or local[N] + spConf.set("spark.executor.instances", numExecutors) + spConf.set("spark.executor.cores", coresPerExecutor) + spConf.setMaster(sparkMaster) + sc = SparkContext(conf=spConf) + return sc, numExecutors, numPartitions + + +def readAndMask(url, variable, mask=None, cachePath=CachePath, hdfsPath=None): + '''Read a variable from a netCDF or HDF file and return a numpy masked array. +If the URL is remote or HDFS, first retrieve the file into a cache directory. + ''' + v = None + if mask: + variables = [variable, mask] + else: + variables = [variable] + try: + path = retrieveFile(url, cachePath, hdfsPath) + except: + print >> sys.stderr, 'readAndMask: Error, continuing without file %s' % url + return v + + try: + print >> sys.stderr, 'Reading variable %s from %s' % (variable, path) + var, fh = getVariables(path, variables, arrayOnly=True, + set_auto_mask=True) # return dict of variable objects by name + v = var[ + variable] # could be masked array + if v.shape[0] == 1: v = v[0] # throw away trivial time dimension for CF-style files + if VERBOSE: print >> sys.stderr, 'Variable range: %fs to %f' % (v.min(), v.max()) + close(fh) + except: + print >> sys.stderr, 'readAndMask: Error, cannot read variable %s from file %s' % (variable, path) + + return v + + +def readCoordinates(path, coordinates=['lat', 'lon']): + '''Read coordinate arrays from local netCDF file.''' + var, fh = getVariables(path, coordinates, arrayOnly=True, set_auto_mask=True) + close(fh) + return [var[k] for k in coordinates] + + +def accumulate(urls, variable, maskVar, coordinates, reader=readAndMask, + accumulators=['count', 'mean', 'M2', 'min', 'max'], cachePath=CachePath, hdfsPath=None): + '''Accumulate data into statistics accumulators like count, sum, sumsq, min, max, M3, M4, etc.''' + keys, urls = urls + print >> sys.stderr, 'Updating accumulators %s for key %s' % (str(accumulators), str(keys)) + accum = {} + accum['_meta'] = {'urls': urls, 'coordinates': coordinates} + for i, url in enumerate(urls): + v = reader(url, variable, maskVar, cachePath, hdfsPath) + if v is None: continue + + if i == 0: + for k in accumulators: + if k[0] == '_': continue + if k == 'min': + accum[k] = default_fillvals['f8'] * N.ones(v.shape, dtype=N.float64) + elif k == 'max': + accum[k] = -default_fillvals['f8'] * N.ones(v.shape, dtype=N.float64) + elif k == 'count': + accum[k] = N.zeros(v.shape, dtype=N.int64) + else: + accum[k] = N.zeros(v.shape, dtype=N.float64) + + if N.ma.isMaskedArray(v): + if 'count' in accumulators: + accum['count'] += ~v.mask + if 'min' in accumulators: + accum['min'] = N.ma.minimum(accum['min'], v) + if 'max' in accumulators: + accum['max'] = N.ma.maximum(accum['max'], v) + + v = N.ma.filled(v, 0.) + else: + if 'count' in accumulators: + accum['count'] += 1 + if 'min' in accumulators: + accum['min'] = N.minimum(accum['min'], v) + if 'max' in accumulators: + accum['max'] = N.maximum(accum['max'], v) + + if 'mean' in accumulators: + n = accum['count'] + # mask = N.not_equal(n, 0) + # subtract running mean from new values, eliminate roundoff errors + # delta = N.choose(mask, (0, v - accum['mean'])) + delta = v - accum['mean'] + # delta_n = N.choose(mask, (0, delta/n)) + delta_n = delta / n + delta_n = N.nan_to_num(delta_n) # set to zero if n=0 caused a NaN + accum['mean'] += delta_n + if 'M2' in accumulators: + term = delta * delta_n * (n - 1) + accum['M2'] += term + return (keys, accum) + + +def combine(a, b): + '''Combine accumulators by summing.''' + print >> sys.stderr, 'Combining accumulators . . .' + if 'urls' in a['_meta'] and 'urls' in b['_meta']: + a['_meta']['urls'].extend(b['_meta']['urls']) + if 'mean' in a: + ntotal = a['count'] + b['count'] + # mask = N.not_equal(ntotal, 0) + # a['mean'] = N.choose(mask, (0, (a['mean'] * a['count'] + b['mean'] * b['count']) / ntotal)) + a['mean'] = (a['mean'] * a['count'] + b['mean'] * b['count']) / ntotal + if 'min' in a: + if N.ma.isMaskedArray(a): + a['min'] = N.ma.minimum(a['min'], b['min']) + else: + a['min'] = N.minimum(a['min'], b['min']) + if 'max' in a: + if N.ma.isMaskedArray(a): + a['max'] = N.ma.maximum(a['max'], b['max']) + else: + a['max'] = N.maximum(a['max'], b['max']) + for k in a.keys(): + if k[0] == '_': continue + if k != 'mean' and k != 'min' and k != 'max': + a[k] += b[k] # just sum count and other moments + return a + + +def statsFromAccumulators(accum): + '''Compute final statistics from accumulators.''' + print >> sys.stderr, 'Computing statistics from accumulators . . .' + keys, accum = accum + + # Mask all of the accumulator arrays for zero counts + if 'count' in accum: + accum['count'] = N.ma.masked_less_equal(accum['count'], 0, copy=False) + mask = accum['count'].mask + for k in accum: + if k[0] == '_': continue + if k != 'count': + accum[k] = N.ma.array(accum[k], copy=False, mask=mask) + + # Compute stats + stats = {} + if '_meta' in accum: + stats['_meta'] = accum['_meta'] + if 'count' in accum: + stats['count'] = accum['count'] + if 'min' in accum: + stats['min'] = accum['min'] + if 'max' in accum: + stats['max'] = accum['max'] + if 'mean' in accum: + stats['mean'] = accum['mean'] + if 'M2' in accum: + stats['stddev'] = N.sqrt(accum['M2'] / (accum['count'] - 1)) + + # Convert stats arrays to masked arrays, keeping only cells with count > 0 + # mask = stats['count'] <= 0 + # for k in stats: + # if k[0] == '_': continue + # stats[k] = N.ma.masked_where(mask, stats[k], copy=False) + return (keys, stats) + + +def writeStats(stats, inputFile, variable, coordinates, outFile, format='NETCDF4', cachePath='cache'): + '''Write out stats arrays to netCDF with some attributes.''' + if os.path.exists(outFile): os.unlink(outFile) + dout = Dataset(outFile, 'w', format=format) + print >> sys.stderr, 'Writing stats for variable %s to %s ...' % (variable, outFile) + print >> sys.stderr, 'Shape:', stats['mean'].shape + dout.setncattr('variable', variable) + dout.setncattr('urls', str(stats['_meta']['urls'])) + + din = Dataset(inputFile, 'r') + # Transfer global attributes from input file + # for a in din.ncattrs(): + # dout.setncattr(a, din.getncattr(a)) + + print >> sys.stderr, 'Using coordinates & attributes from %s' % inputFile + coordinatesSave = coordinates + try: + coordinatesFromFile = din.variables[variable].getncattr('coordinates') + if 'lat' in coorindatesFromFile.lower() and 'lon' in coordinatesFromFile.lower(): + coordinates = coordinatesFromFile.split() + if coordinates[0].lower() == 'time': coordinates = coordinates[ + 1:] # discard trivial time dimension for CF-style files + else: + coordinates = coordinatesSave # use input coordinates + except: + if coordinates is None or len(coordinates) == 0: + coordinates = ('lat', 'lon') # kludge: another possibility + + # Add dimensions and variables, copying data + coordDim = [dout.createDimension(coord, din.variables[coord].shape[0]) for coord in + coordinates] # here lat, lon, alt, etc. + for coord in coordinates: + var = dout.createVariable(coord, din.variables[coord].dtype, (coord,)) + var[:] = din.variables[coord][:] + + # Add stats variables + for k, v in stats.items(): + if k[0] == '_': continue + var = dout.createVariable(k, stats[k].dtype, coordinates) + fillVal = default_fillvals[v.dtype.str.strip("<>")] # remove endian part of dtype to do lookup + # print >>sys.stderr, "Setting _FillValue attribute for %s to %s" % (k, fillVal) + var.setncattr('_FillValue', fillVal) + var[:] = v[:] + + # Add attributes from variable in input file (does this make sense?) + if k == 'count': continue + try: + vin = din.variables[variable] + for a in vin.ncattrs(): + if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue + var.setncattr(a, vin.getncattr(a)) + except KeyError: + pass + + din.close() + dout.close() + return outFile + + +# CollectTimer context manager +class CollectTimer(object): + '''Automatically collect Spark result and do timing.''' + + def __init__(self, name): + self.name = name + self._collect = CollectAndTime + + def __enter__(self): + self.start = time.time() + + def __exit__(self, ty, val, tb): + if not self._collect: + return False + else: + # what goes here?? + end = time.time() + print >> sys.stderr, "timer: " + self.name + ": %0.3f seconds" % (end - self.start) + sys.stdout.flush() + return False + + +# Timer context manager +class Timer(object): + '''Automatically collect Spark result and do timing.''' + + def __init__(self, name): + self.name = name + + def __enter__(self): + self.start = time.time() + + def __exit__(self, ty, val, tb): + end = time.time() + print >> sys.stderr, "timer: " + self.name + ": %0.3f seconds" % (end - self.start) + sys.stdout.flush() + return False + + +def contourMap(var, variable, coordinates, n, plotFile): + '''Make contour plot for the var array using coordinates vector for lat/lon coordinates.''' + # TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays + # Fixed color scale, write file, turn off auto borders, set title, etc. + lats = coordinates[0][:] + lons = coordinates[1][:] + if lats[1] < lats[0]: # if latitudes decreasing, reverse + lats = N.flipud(lats) + var = N.flipud(var[:]) + + imageMap(lons, lats, var, + vmin=-2., vmax=45., outFile=plotFile, autoBorders=False, + title='%s %d-day Mean from %s' % (variable.upper(), n, os.path.splitext(plotFile)[0])) + print >> sys.stderr, 'Writing contour plot to %s' % plotFile + return plotFile + + +def histogram(vals, variable, n, outFile): + figFile = os.path.splitext(outFile)[0] + '_hist.png' + M.clf() + # M.hist(vals, 47, (-2., 45.)) + M.hist(vals, 94) + M.xlim(-5, 45) + M.xlabel('SST in degrees Celsius') + M.ylim(0, 300000) + M.ylabel('Count') + M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile)) + M.show() + print >> sys.stderr, 'Writing histogram plot to %s' % figFile + M.savefig(figFile) + return figFile + + +def computeClimatology(datasetName, nEpochs, nWindow, averager, outHdfsPath, sparkConfig): + '''Compute an N-day climatology for the specified dataset and write the files to HDFS.''' + if averager not in AveragingFunctions: + print >> sys.stderr, 'computeClimatology: Error, averager %s must be in set %s' % ( + averager, str(AveragingFunctions.keys())) + sys.exit(1) + try: + ds = DatasetList[datasetName] # get dataset metadata class + except: + print >> sys.stderr, 'computeClimatology: Error, %s not in dataset list %s.' % (datasetName, str(DatasetList)) + sys.exit(1) + urls = glob.glob(ds.UrlsPath) + with Timer("climByAveragingPeriods"): + results = climByAveragingPeriods(urls, ds, nEpochs, nWindow, ds.Variable, ds.Mask, ds.Coordinates, + getattr(ds, "readAndMask", readAndMask), outHdfsPath, ds.split, + averager=averager, averagingConfig=AveragingConfigs[averager], + sparkConfig=sparkConfig) + return results + + +def main(args): + dsName = args[0] + nEpochs = int(args[1]) + nWindow = int(args[2]) + averager = args[3] + sparkConfig = args[4] + outHdfsPath = args[5] + + results = computeClimatology(dsName, nEpochs, nWindow, averager, outHdfsPath, sparkConfig) + + if isinstance(results[0], tuple): + makeMovie([r[1] for r in results], 'clim.mpg') + return results + + +if __name__ == '__main__': + print main(sys.argv[1:]) + + +# Debug Tests: +# python ClimatologySpark2.py ModisSst 5 5 pixelMean multicore,1,1 cache/clim +# python ClimatologySpark2.py ModisChlor 5 5 pixelMean multicore,1,1 cache/clim +# python ClimatologySpark2.py MeasuresSsh 5 5 pixelMean multicore,1,1 cache/clim +# python ClimatologySpark2.py ModisSst 5 5 pixelMean multicore,2,2 cache/clim + +# Production: +# time python ClimatologySpark2.py ModisSst 5 5 pixelMean mesos,37,37 cache/clim +# time python ClimatologySpark2.py ModisChlor 5 5 pixelMean mesos,37,37 cache/clim +# time python ClimatologySpark2.py MeasuresSsh 5 5 pixelMean mesos,37,37 cache/clim + +# time python ClimatologySpark2.py ModisSst 5 5 pixelMean dask,37 cache/clim +# time python ClimatologySpark2.py ModisChlor 5 5 pixelMean dask,37 cache/clim +# time python ClimatologySpark2.py MeasuresSsh 5 5 pixelMean dask,37 cache/clim http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/README.md ---------------------------------------------------------------------- diff --git a/climatology/clim/README.md b/climatology/clim/README.md new file mode 100644 index 0000000..f99ae80 --- /dev/null +++ b/climatology/clim/README.md @@ -0,0 +1,32 @@ +# climatology-gaussianInterp module + +Compute N-day climatology from daily ocean data files. + +See bottom of climatology2.py for examples of what to run. + +For speedup, the gaussian weighting is also implemented in Fortran +and cython. The pure python version is abysmally slow, of course, +and even the Fortran takes a while (evaluating too many exponentials). + +To build gaussInter_f.so from gaussInterp_f.f, run the line in +gaussInterp_f.mk. + + + +3 Climatologyâs to generate. Daily data is already downloaded to server-1. Location is (a) shown below + +1. MODIS CHL_A +a. server-1:/data/share/datasets/MODIS_L3m_DAY_CHL_chlor_a_4km/daily_data +b. https://oceandata.sci.gsfc.nasa.gov/MODIS-Aqua/Mapped/Daily/4km/chlor_a + +2. Measures SSH +a. server-1:/data/share/datasets/MEASURES_SLA_JPL_1603/daily_data +b. http://podaac.jpl.nasa.gov/dataset/SEA_SURFACE_HEIGHT_ALT_GRIDS_L4_2SATS_5DAY_6THDEG_V_JPL1609 +c. ftp://podaac-ftp.jpl.nasa.gov/allData/merged_alt/L4/cdr_grid/ + +3. CCMP Wind +a. server-1:/data/share/datasets/CCMP_V2.0_L3.0/daily_data/ +b. https://podaac.jpl.nasa.gov/dataset/CCMP_MEASURES_ATLAS_L4_OW_L3_0_WIND_VECTORS_FLK + + + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/__init__.py ---------------------------------------------------------------------- diff --git a/climatology/clim/__init__.py b/climatology/clim/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/binsum.f ---------------------------------------------------------------------- diff --git a/climatology/clim/binsum.f b/climatology/clim/binsum.f new file mode 100644 index 0000000..96b65af --- /dev/null +++ b/climatology/clim/binsum.f @@ -0,0 +1,64 @@ +c $Revision: 2.4 $ +c subprogram binsum + +c-- calculate interpolation weights and vsum + + subroutine binsum( nf, x, y, z, f, + & xx, yy, zz, vsum, + & ixmin, ixmax, iymin, + & iymax, izmin, izmax, + & imax, jmax, kmax, ndatamax ) + + real*4 x(ndatamax),y(ndatamax),z(ndatamax),f(ndatamax) + real*4 xx(imax),yy(jmax),zz(kmax) + real*4 vsum(2,imax,jmax,kmax) + integer*4 ixmin(ndatamax),ixmax(ndatamax) + integer*4 iymin(ndatamax),iymax(ndatamax) + integer*4 izmin(ndatamax),izmax(ndatamax) + + common /parms/ imin,jmin,kmin, + & xwin2,ywin2,zwin2, + & xh,yh,zh, + & dx,dy,dz, + & xlo,xhi,ylo,yhi,zlo, + & dxd,dyd + + data xfac/-0.6931/ + + do n=1,nf + ixmin(n)=max(nint((x(n)-xwin2-xlo+0.5*dx)/dx),1) + ixmax(n)=min(nint((x(n)+xwin2-xlo+0.5*dx)/dx),imax) + iymin(n)=max(nint((y(n)-ywin2-ylo+0.5*dy)/dy),1) + iymax(n)=min(nint((y(n)+ywin2-ylo+0.5*dy)/dy),jmax) + izmin(n)=max(nint((z(n)-zwin2-zlo+0.5*dz)/dz),1) + izmax(n)=min(nint((z(n)+zwin2-zlo+0.5*dz)/dz),kmax) +c print *, x(n),y(n),z(n), f(n) +c print *,' ixmin, ixmax', ixmin(n), ixmax(n) +c print *,' iymin, iymax', iymin(n), iymax(n) +c print *,' izmin, izmax', izmin(n), izmax(n) + enddo + + + + do n=1,nf + do kk=izmin(n),izmax(n) + do jj=iymin(n),iymax(n) + do ii=ixmin(n),ixmax(n) + +c- - this is the algorithm coded for weights + + fac=exp( xfac*(((x(n)-xx(ii))/xh)**2 + & + ((y(n)-yy(jj))/yh)**2 + & + ((z(n)-zz(kk))/zh)**2) ) + +c print *, 'x, xx, y, yy, z, zz, fac f', +c & x(n), xx(ii), y(n), yy(jj), z(n), zz(kk), fac, f(n) + + vsum(1,ii,jj,kk)=vsum(1,ii,jj,kk)+f(n)*fac + vsum(2,ii,jj,kk)=vsum(2,ii,jj,kk)+fac + enddo + enddo + enddo + enddo + return + end http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/cache.py ---------------------------------------------------------------------- diff --git a/climatology/clim/cache.py b/climatology/clim/cache.py new file mode 100644 index 0000000..f5aa1f2 --- /dev/null +++ b/climatology/clim/cache.py @@ -0,0 +1,73 @@ +""" +cache.py + +Utilities to retrieve files and cache them at deterministic paths. + +""" + +import sys, os, urlparse, urllib + +# Directory to cache retrieved files in +CachePath = '/tmp/cache' + + +def isLocalFile(url): + '''Check if URL is a local path.''' + u = urlparse.urlparse(url) + if u.scheme == '' or u.scheme == 'file': + if not os.path.exists(u.path): + print >>sys.stderr, 'isLocalFile: File at local path does not exist: %s' % u.path + return (True, u.path) + else: + return (False, u.path) + + +def retrieveFile(url, cachePath=CachePath, hdfsPath=None, retries=3): + '''Retrieve a file from a URL or from an HDFS path.''' + if hdfsPath: + hdfsFile = os.path.join(hdfsPath, url) + return hdfsCopyToLocal(hdfsFile, cachePath) + else: + return retrieveFileWeb(url, cachePath, retries) + + +def retrieveFileWeb(url, cachePath=CachePath, retries=3): + '''Retrieve a file from a URL, or if it is a local path then verify it exists.''' + if cachePath is None: cachePath = './' + ok, path = isLocalFile(url) + if ok: return path + + fn = os.path.split(path)[1] + outPath = os.path.join(cachePath, fn) + if os.path.exists(outPath): + print >>sys.stderr, 'retrieveFile: Using cached file: %s' % outPath + return outPath + else: + print >>sys.stderr, 'retrieveFile: Retrieving (URL) %s to %s' % (url, outPath) + for i in xrange(retries): + try: + urllib.urlretrieve(url, outPath) + return outPath + except: + print >>sys.stderr, 'retrieveFile: Error retrieving file at URL: %s' % url + print >>sys.stderr, 'retrieveFile: Retrying ...' + print >>sys.stderr, 'retrieveFile: Fatal error, Cannot retrieve file at URL: %s' % url + return None + + +def hdfsCopyFromLocal(src, dest): + '''Copy local file into HDFS directory, overwriting using force switch.''' + outPath = os.path.join(dest, os.path.split(src)[1]) + cmd = "hadoop fs -copyFromLocal -f %s %s" % (src, dest) + print >>sys.stderr, "Exec overwrite: %s" % cmd + os.system(cmd) + return outPath + +def hdfsCopyToLocal(src, dest): + '''Copy HDFS file to local path, overwriting.''' + outPath = os.path.join(dest, os.path.split(src)[1]) + os.unlink(outPath) + cmd = "hadoop fs -copyToLocal %s %s" % (src, dest) + print >>sys.stderr, "Exec overwrite: %s" % cmd + os.system(cmd) + return outPath http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology.py ---------------------------------------------------------------------- diff --git a/climatology/clim/climatology.py b/climatology/clim/climatology.py new file mode 100755 index 0000000..940a8d0 --- /dev/null +++ b/climatology/clim/climatology.py @@ -0,0 +1,233 @@ +""" +climatology.py + +Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids. + +Simple code to be run on Spark cluster, or using multi-core parallelism on single machine. + +""" + +import sys, os, calendar, urlparse, urllib +from datetime import datetime +import numpy as N +from variables import getVariables, close +#from timePartitions import partitionFilesByKey +from split import fixedSplit +#from stats import Stats +from pathos.multiprocessing import ProcessingPool as Pool +from plotlib import imageMap, makeMovie + +#from gaussInterp import gaussInterp + +VERBOSE = 1 + +# Possible execution modes +# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster. +ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark'] + +# SST L3m 4.6km Metadata +# SST calues are scaled integers in degrees Celsius, lat/lon is 4320 x 8640 +# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lat', 'lon'] + +# Generate algorithmic name for N-day Climatology product +SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #?? + +# Simple mask and average functions to get us started, then add gaussian interpolation. +# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage +def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask)) +#def qcMask(var, mask): return N.ma.masked_where(mask != 0, var) + +def average(a): return N.ma.mean(a, axis=0) +#AveragingFunctions = {'pixelAverage': average, 'gaussInterp': gaussInterp} +AveragingFunctions = {'pixelAverage': average, 'gaussInterp': average} + + +def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year) + nEpochs, # compute a climatology for every N epochs (days) by 'averaging' + nWindow, # number of epochs in window needed for averaging + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + maskFn=qcMask, # mask function to compute mask from mask variable + averager='pixelAverage', # averaging function to use, one of ['pixelAverage', 'gaussInterp'] + mode='sequential', # Map across time periods of N-days for concurrent work, executed by: + # 'sequential' map, 'multicore' using pool.map(), 'cluster' using pathos pool.map(), + # or 'spark' using PySpark + numNodes=1, # number of cluster nodes to use + nWorkers=4, # number of parallel workers per node + averagingFunctions=AveragingFunctions, # dict of possible averaging functions + legalModes=ExecutionModes # list of possiblel execution modes + ): + '''Compute a climatology every N days by applying a mask and averaging function. +Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + try: + averageFn = averagingFunctions[averager] + except : + averageFn = average + print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions) + + urlSplits = [s for s in fixedSplit(urls, nEpochs)] + if VERBOSE: print >>sys.stderr, urlSplits + + def climsContoured(urls): + n = len(urls) + var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn) + return contourMap(var, variable, coordinates, n, urls[0]) + + if mode == 'sequential': + plots = map(climsContoured, urlSplits) + elif mode == 'multicore': + pool = Pool(nWorkers) + plots = pool.map(climsContoured, urlSplits) + elif mode == 'cluster': + pass + elif mode == 'spark': + pass + + plots = map(climsContoured, urlSplits) + print plots + return plots +# return makeMovie(plots, 'clim.mpg') + + +def climByAveraging(urls, # list of granule URLs for a time period + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + maskFn=qcMask, # mask function to compute mask from mask variable + averageFn=average # averaging function to use + ): + '''Compute a climatology over N arrays by applying a mask and averaging function. +Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + n = len(urls) + varList = [variable, mask] + for i, url in enumerate(urls): + fn = retrieveFile(url, '~/cache') + if VERBOSE: print >>sys.stderr, 'Read variables and mask ...' + var, fh = getVariables(fn, varList) # return dict of variable objects by name + if i == 0: + dtype = var[variable].dtype + shape = (n,) + var[variable].shape + accum = N.ma.empty(shape, dtype) + + v = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA +# v = var[variable][:] + accum[i] = v # accumulate N arrays for 'averaging' + if i+1 != len(urls): # keep var dictionary from last file to grab metadata + close(fh) # REMEMBER: closing fh loses in-memory data structures + + if VERBOSE: print >>sys.stderr, 'Averaging ...' + coord, fh = getVariables(fn, coordinates) # read coordinate arrays and add to dict + for c in coordinates: var[c] = coord[c][:] + + if averageFn == average: + avg = averageFn(accum) # call averaging function + else: + var[variable] = accum + if averageFn == gaussInterp: + varNames = variable + coordinates + avg, vweight, status = \ + gaussInterp(var, varNames, latGrid, lonGrid, wlat, wlon, slat, slon, stime, vfactor, missingValue) + + var['attributes'] = var[variable].__dict__ # save attributes of primary variable + var[variable] = avg # return primary variable & mask arrays in dict + var[mask] = N.ma.getmask(avg) + +# close(fh) # Can't close, lose netCDF4.Variable objects, leaking two fh + return var + + +def contourMap(var, variable, coordinates, n, url): + p = urlparse.urlparse(url) + filename = os.path.split(p.path)[1] + return filename + outFile = filename + '.png' + # Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays + vals = var[variable][:] + + # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing?? + imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:], + vmin=-2., vmax=45., outFile=outFile, autoBorders=False, + title='%s %d-day Mean from %s' % (variable.upper(), n, filename)) + print >>sys.stderr, 'Writing contour plot to %s' % outFile + return outFile + + +def isLocalFile(url): + '''Check if URL is a local path.''' + u = urlparse.urlparse(url) + if u.scheme == '' or u.scheme == 'file': + if not path.exists(u.path): + print >>sys.stderr, 'isLocalFile: File at local path does not exist: %s' % u.path + return (True, u.path) + else: + return (False, u.path) + + +def retrieveFile(url, dir=None): + '''Retrieve a file from a URL, or if it is a local path then verify it exists.''' + if dir is None: dir = './' + ok, path = isLocalFile(url) + fn = os.path.split(path)[1] + outPath = os.path.join(dir, fn) + if not ok: + if os.path.exists(outPath): + print >>sys.stderr, 'retrieveFile: Using cached file: %s' % outPath + else: + try: + print >>sys.stderr, 'retrieveFile: Retrieving (URL) %s to %s' % (url, outPath) + urllib.urlretrieve(url, outPath) + except: + print >>sys.stderr, 'retrieveFile: Cannot retrieve file at URL: %s' % url + return None + return outPath + + +def dailyFile2date(path, offset=1): + '''Convert YYYYDOY string in filename to date.''' + fn = os.path.split(path)[1] + year = int(fn[offset:offset+4]) + doy = int(fn[offset+5:offset+8]) + return fn[5:15].replace('.', '/') + + +def formatRegion(r): + """Format lat/lon region specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + strs = [str(i).replace('-', 'm') for i in r] + return 'region-%s-%sby%s-%s' % tuple(strs) + + +def formatGrid(r): + """Format lat/lon grid resolution specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + return str(r[0]) + 'by' + str(r[1]) + + +def main(args): + nEpochs = int(args[0]) + nWindow = int(args[1]) + averager = args[2] + mode = args[3] + nWorkers = int(args[4]) + urlFile = args[5] + urls = [s.strip() for s in open(urlFile, 'r')] + return climByAveragingPeriods(urls, nEpochs, nWindow, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, mode=mode, nWorkers=nWorkers) + + +if __name__ == '__main__': + print main(sys.argv[1:]) + + +# python climatology.py 5 5 pixelAverage sequential 1 urls_sst_10days.txt +# python climatology.py 5 5 gaussianInterp multicore 8 urls_sst_40days.txt +
