http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology1.py ---------------------------------------------------------------------- diff --git a/climatology/clim/climatology1.py b/climatology/clim/climatology1.py new file mode 100755 index 0000000..940a8d0 --- /dev/null +++ b/climatology/clim/climatology1.py @@ -0,0 +1,233 @@ +""" +climatology.py + +Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids. + +Simple code to be run on Spark cluster, or using multi-core parallelism on single machine. + +""" + +import sys, os, calendar, urlparse, urllib +from datetime import datetime +import numpy as N +from variables import getVariables, close +#from timePartitions import partitionFilesByKey +from split import fixedSplit +#from stats import Stats +from pathos.multiprocessing import ProcessingPool as Pool +from plotlib import imageMap, makeMovie + +#from gaussInterp import gaussInterp + +VERBOSE = 1 + +# Possible execution modes +# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster. +ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark'] + +# SST L3m 4.6km Metadata +# SST calues are scaled integers in degrees Celsius, lat/lon is 4320 x 8640 +# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lat', 'lon'] + +# Generate algorithmic name for N-day Climatology product +SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #?? + +# Simple mask and average functions to get us started, then add gaussian interpolation. +# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage +def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask)) +#def qcMask(var, mask): return N.ma.masked_where(mask != 0, var) + +def average(a): return N.ma.mean(a, axis=0) +#AveragingFunctions = {'pixelAverage': average, 'gaussInterp': gaussInterp} +AveragingFunctions = {'pixelAverage': average, 'gaussInterp': average} + + +def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year) + nEpochs, # compute a climatology for every N epochs (days) by 'averaging' + nWindow, # number of epochs in window needed for averaging + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + maskFn=qcMask, # mask function to compute mask from mask variable + averager='pixelAverage', # averaging function to use, one of ['pixelAverage', 'gaussInterp'] + mode='sequential', # Map across time periods of N-days for concurrent work, executed by: + # 'sequential' map, 'multicore' using pool.map(), 'cluster' using pathos pool.map(), + # or 'spark' using PySpark + numNodes=1, # number of cluster nodes to use + nWorkers=4, # number of parallel workers per node + averagingFunctions=AveragingFunctions, # dict of possible averaging functions + legalModes=ExecutionModes # list of possiblel execution modes + ): + '''Compute a climatology every N days by applying a mask and averaging function. +Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + try: + averageFn = averagingFunctions[averager] + except : + averageFn = average + print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions) + + urlSplits = [s for s in fixedSplit(urls, nEpochs)] + if VERBOSE: print >>sys.stderr, urlSplits + + def climsContoured(urls): + n = len(urls) + var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn) + return contourMap(var, variable, coordinates, n, urls[0]) + + if mode == 'sequential': + plots = map(climsContoured, urlSplits) + elif mode == 'multicore': + pool = Pool(nWorkers) + plots = pool.map(climsContoured, urlSplits) + elif mode == 'cluster': + pass + elif mode == 'spark': + pass + + plots = map(climsContoured, urlSplits) + print plots + return plots +# return makeMovie(plots, 'clim.mpg') + + +def climByAveraging(urls, # list of granule URLs for a time period + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + maskFn=qcMask, # mask function to compute mask from mask variable + averageFn=average # averaging function to use + ): + '''Compute a climatology over N arrays by applying a mask and averaging function. +Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + n = len(urls) + varList = [variable, mask] + for i, url in enumerate(urls): + fn = retrieveFile(url, '~/cache') + if VERBOSE: print >>sys.stderr, 'Read variables and mask ...' + var, fh = getVariables(fn, varList) # return dict of variable objects by name + if i == 0: + dtype = var[variable].dtype + shape = (n,) + var[variable].shape + accum = N.ma.empty(shape, dtype) + + v = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA +# v = var[variable][:] + accum[i] = v # accumulate N arrays for 'averaging' + if i+1 != len(urls): # keep var dictionary from last file to grab metadata + close(fh) # REMEMBER: closing fh loses in-memory data structures + + if VERBOSE: print >>sys.stderr, 'Averaging ...' + coord, fh = getVariables(fn, coordinates) # read coordinate arrays and add to dict + for c in coordinates: var[c] = coord[c][:] + + if averageFn == average: + avg = averageFn(accum) # call averaging function + else: + var[variable] = accum + if averageFn == gaussInterp: + varNames = variable + coordinates + avg, vweight, status = \ + gaussInterp(var, varNames, latGrid, lonGrid, wlat, wlon, slat, slon, stime, vfactor, missingValue) + + var['attributes'] = var[variable].__dict__ # save attributes of primary variable + var[variable] = avg # return primary variable & mask arrays in dict + var[mask] = N.ma.getmask(avg) + +# close(fh) # Can't close, lose netCDF4.Variable objects, leaking two fh + return var + + +def contourMap(var, variable, coordinates, n, url): + p = urlparse.urlparse(url) + filename = os.path.split(p.path)[1] + return filename + outFile = filename + '.png' + # Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays + vals = var[variable][:] + + # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing?? + imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:], + vmin=-2., vmax=45., outFile=outFile, autoBorders=False, + title='%s %d-day Mean from %s' % (variable.upper(), n, filename)) + print >>sys.stderr, 'Writing contour plot to %s' % outFile + return outFile + + +def isLocalFile(url): + '''Check if URL is a local path.''' + u = urlparse.urlparse(url) + if u.scheme == '' or u.scheme == 'file': + if not path.exists(u.path): + print >>sys.stderr, 'isLocalFile: File at local path does not exist: %s' % u.path + return (True, u.path) + else: + return (False, u.path) + + +def retrieveFile(url, dir=None): + '''Retrieve a file from a URL, or if it is a local path then verify it exists.''' + if dir is None: dir = './' + ok, path = isLocalFile(url) + fn = os.path.split(path)[1] + outPath = os.path.join(dir, fn) + if not ok: + if os.path.exists(outPath): + print >>sys.stderr, 'retrieveFile: Using cached file: %s' % outPath + else: + try: + print >>sys.stderr, 'retrieveFile: Retrieving (URL) %s to %s' % (url, outPath) + urllib.urlretrieve(url, outPath) + except: + print >>sys.stderr, 'retrieveFile: Cannot retrieve file at URL: %s' % url + return None + return outPath + + +def dailyFile2date(path, offset=1): + '''Convert YYYYDOY string in filename to date.''' + fn = os.path.split(path)[1] + year = int(fn[offset:offset+4]) + doy = int(fn[offset+5:offset+8]) + return fn[5:15].replace('.', '/') + + +def formatRegion(r): + """Format lat/lon region specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + strs = [str(i).replace('-', 'm') for i in r] + return 'region-%s-%sby%s-%s' % tuple(strs) + + +def formatGrid(r): + """Format lat/lon grid resolution specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + return str(r[0]) + 'by' + str(r[1]) + + +def main(args): + nEpochs = int(args[0]) + nWindow = int(args[1]) + averager = args[2] + mode = args[3] + nWorkers = int(args[4]) + urlFile = args[5] + urls = [s.strip() for s in open(urlFile, 'r')] + return climByAveragingPeriods(urls, nEpochs, nWindow, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, mode=mode, nWorkers=nWorkers) + + +if __name__ == '__main__': + print main(sys.argv[1:]) + + +# python climatology.py 5 5 pixelAverage sequential 1 urls_sst_10days.txt +# python climatology.py 5 5 gaussianInterp multicore 8 urls_sst_40days.txt +
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology2.py ---------------------------------------------------------------------- diff --git a/climatology/clim/climatology2.py b/climatology/clim/climatology2.py new file mode 100755 index 0000000..231efb7 --- /dev/null +++ b/climatology/clim/climatology2.py @@ -0,0 +1,453 @@ +""" +climatology2.py + +Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids. + +Simple code to be run on Spark cluster, or using multi-core parallelism on single machine. + +""" + +import sys, os, urlparse, urllib, re, time +import numpy as N +import matplotlib +matplotlib.use('Agg') +import matplotlib.pylab as M + +from variables import getVariables, close +from cache import retrieveFile, CachePath +from split import fixedSplit, splitByNDays +from netCDF4 import Dataset, default_fillvals +from pathos.multiprocessing import ProcessingPool as Pool +from plotlib import imageMap, makeMovie + +from spatialFilter import spatialFilter +from gaussInterp import gaussInterp # calls into Fortran version gaussInterp_f.so +#from gaussInterp_slow import gaussInterp_slow as gaussInterp # pure python, slow debuggable version + +#import pyximport; pyximport.install(pyimport=False) +#from gaussInterp import gaussInterp, gaussInterp_ # attempted cython version, had issues + +VERBOSE = 1 + +# Possible execution modes +# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster. +ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark'] + +# SST L3m 4.6km Metadata +# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320 +# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat'] + +# Generate algorithmic name for N-day Climatology product +SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #?? + +# Simple mask and average functions to get us started, then add gaussian interpolation. +# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage +#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0)) + +def qcMask(var, mask): return N.ma.masked_where(mask != 0, var) +#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var) + +def splitModisSst(seq, n): + for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m'), keyed=False): + yield chunk + +def mean(a): return N.ma.mean(a, axis=0) + +AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter} + +PixelMeanConfig = {'name': 'pixelMean'} + +GaussInterpConfig = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 3, 'wlon': 3, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig1a = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 0.30, 'wlon': 0.30, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig1b = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 0.08, 'wlon': 0.08, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig2 = {'name': 'gaussInterp', + 'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25), + 'wlat': 2., 'wlon': 2., + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]] # divide by 16 +FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]] # divide by 9 + +SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16., + 'spatialFilter': N.array(FilterGaussian, dtype=N.int32), + 'missingValue': default_fillvals['f4']} + +SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9., + 'spatialFilter': N.array(FilterLowPass, dtype=N.int32), + 'missingValue': default_fillvals['f4']} + + + +def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year) + nEpochs, # compute a climatology for every N epochs (days) by 'averaging' + nWindow, # number of epochs in window needed for averaging + nNeighbors, # number of neighbors on EACH side in lat/lon directions to use in averaging + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + splitFn=splitModisSst, # split function to use to partition the input URL list + maskFn=qcMask, # mask function to compute mask from mask variable + averager='pixelMean', # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter'] + averagingConfig={}, # dict of parameters to control the averaging function (e.g. gaussInterp) + optimization='fortran', # optimization mode (fortran or cython) + mode='sequential', # Map across time periods of N-days for concurrent work, executed by: + # 'sequential' map, 'multicore' using pool.map(), 'cluster' using pathos pool.map(), + # or 'spark' using PySpark + numNodes=1, # number of cluster nodes to use + nWorkers=4, # number of parallel workers per node + averagingFunctions=AveragingFunctions, # dict of possible averaging functions + legalModes=ExecutionModes, # list of possible execution modes + cachePath=CachePath # directory to cache retrieved files in + ): + '''Compute a climatology every N days by applying a mask and averaging function. +Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + if averagingConfig['name'] == 'gaussInterp': + averagingConfig['wlat'] = nNeighbors + averagingConfig['wlon'] = nNeighbors + try: + averageFn = averagingFunctions[averager] + except: + print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions) + sys.exit(1) + + urlSplits = [s for s in splitFn(urls, nEpochs)] + + def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], format='NETCDF4', cachePath=cachePath): + n = len(urls) + if VERBOSE: print >>sys.stderr, urls + + var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn, averagingConfig, optimization, cachePath) + + fn = os.path.split(urls[0])[1] + inFile = os.path.join(cachePath, fn) + method = averagingConfig['name'] + fn = os.path.splitext(fn)[0] + day = fn[5:8] + nDays = int(var['time'][0]) + + if 'wlat' in averagingConfig: + wlat = averagingConfig['wlat'] + else: + wlat = 1 + if int(wlat) == wlat: + outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, method, int(wlat)) # mark each file with first day in period + else: + outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, nDays, method, wlat) # mark each file with first day in period + + outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue, format) + + if plot == 'contour': + figFile = contourMap(var, variable, coordinates, n, outFile) + elif plot == 'histogram': +# figFile = histogram(var, variable, n, outFile) + figFile = None + else: + figFile = None + return (outFile, figFile) + + if mode == 'sequential': + results = map(climsContoured, urlSplits) + elif mode == 'multicore': + pool = Pool(nWorkers) + results = pool.map(climsContoured, urlSplits) + elif mode == 'cluster': + pass + elif mode == 'spark': + pass + + return results + + +def climByAveraging(urls, # list of granule URLs for a time period + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + maskFn=qcMask, # mask function to compute mask from mask variable + averageFn=mean, # averaging function to use + averagingConfig={}, # parameters to control averaging function (e.g. gaussInterp) + optimization='fortran', # optimization mode (fortran or cython) + cachePath=CachePath + ): + '''Compute a climatology over N arrays by applying a mask and averaging function. +Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + n = len(urls) + varList = [variable, mask] + var = {} + vtime = N.zeros((n,), N.int32) + + for i, url in enumerate(urls): + try: + path = retrieveFile(url, cachePath) + fn = os.path.split(path)[1] + vtime[i] = int(fn[5:8]) # KLUDGE: extract DOY from filename + except: + print >>sys.stderr, 'climByAveraging: Error, continuing without file %s' % url + accum[i] = emptyVar + continue + if path is None: continue + print >>sys.stderr, 'Read variables and mask ...' + try: + var, fh = getVariables(path, varList, arrayOnly=True, order='F', set_auto_mask=False) # return dict of variable objects by name + except: + print >>sys.stderr, 'climByAveraging: Error, cannot read file %s' % path + accum[i] = emptyVar + continue + if i == 0: + dtype = var[variable].dtype + if 'int' in dtype.name: dtype = N.float32 + shape = (n,) + var[variable].shape + accum = N.ma.empty(shape, dtype, order='F') + emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), order='F') # totally masked variable array for missing or bad file reads + + print >>sys.stderr, 'Read coordinates ...' + var, fh = getVariables(path, coordinates, var, arrayOnly=True, order='F') # read coordinate arrays and add to dict + + var[variable] = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA, turned off masking done by netCDF4 library +# var[variable] = var[variable][:] + + # Echo variable range for sanity check + vals = var[variable].compressed() + print >>sys.stderr, 'Variable Range: min, max:', vals.min(), vals.max() + + # Plot input grid +# figFile = histogram(vals, variable, n, os.path.split(path)[1]) +# figFile = contourMap(var, variable, coordinates[1:], n, os.path.split(path)[1]) + + accum[i] = var[variable] # accumulate N arrays for 'averaging""" +# if i != 0 and i+1 != n: close(fh) # REMEMBER: closing fh loses netCDF4 in-memory data structures + close(fh) + + coordinates = ['time'] + coordinates # add constructed time (days) as first coordinate + var['time'] = vtime + + if averagingConfig['name'] == 'pixelMean': + print >>sys.stderr, 'Doing Pixel Average over %d grids ...' % n + start = time.time() + avg = averageFn(accum) # call pixel averaging function + end = time.time() + print >>sys.stderr, 'pixelMean execution time:', (end - start) + outlat = var[coordinates[1]].astype(N.float32)[:] + outlon = var[coordinates[2]].astype(N.float32)[:] + elif averagingConfig['name'] == 'gaussInterp': + print >>sys.stderr, 'Doing Gaussian Interpolation over %d grids ...' % n + var[variable] = accum + c = averagingConfig + latGrid = c['latGrid']; lonGrid = c['lonGrid'] + if latGrid is not None and lonGrid is not None: + outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], dtype=N.float32, order='F') + outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], dtype=N.float32, order='F') + else: + outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F') + outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F') + varNames = [variable] + coordinates + start = time.time() + avg, weight, status = \ + gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'], + c['slat'], c['slon'], c['stime'], c['vfactor'], c['missingValue'], + VERBOSE, optimization) + end = time.time() + var['outweight'] = weight.astype(N.float32) + print >>sys.stderr, 'gaussInterp execution time:', (end - start) + elif averagingConfig['name'] == 'spatialFilter': + print >>sys.stderr, 'Applying Spatial 3x3 Filter and then averaging over %d grids ...' % n + var[variable] = accum + c = averagingConfig + varNames = [variable] + coordinates + start = time.time() + avg, count, status = \ + spatialFilter(var, varNames, c['spatialFilter'], c['normalization'], + c['missingValue'], VERBOSE, optimization) + end = time.time() + print >>sys.stderr, 'spatialFilter execution time:', (end - start) + outlat = var[coordinates[1]].astype(N.float32)[:] + outlon = var[coordinates[2]].astype(N.float32)[:] + + var['out'+variable] = avg.astype(N.float32) # return primary variable & mask arrays in dict + var['out'+mask] = N.ma.getmask(avg) + var['outlat'] = outlat + var['outlon'] = outlon + return var + + +def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue=None, format='NETCDF4'): + '''Construct output bundle of arrays with NetCDF dimensions and attributes. +Output variables and attributes will have same names as the input file. + ''' + din = Dataset(inFile, 'r') + dout = Dataset(outFile, 'w', format=format) + print >>sys.stderr, 'Writing %s ...' % outFile + + # Transfer global attributes from input file + for a in din.ncattrs(): + dout.setncattr(a, din.getncattr(a)) + + # Add dimensions and variables, copying data + coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for coord in coordinates] # here output lon, lat, etc. + for coord in coordinates: + v = dout.createVariable(coord, var['out'+coord].dtype, (coord,)) + v[:] = var['out'+coord][:] + primaryVar = dout.createVariable(variable, var['out'+variable].dtype, coordinates, fill_value=fillValue) + primaryVar[:] = var['out'+variable][:] # transfer array + maskVar = dout.createVariable(mask, 'i1', coordinates) + maskVar[:] = var['out'+mask].astype('i1')[:] + + # Transfer variable attributes from input file + for k,v in dout.variables.iteritems(): + for a in din.variables[k].ncattrs(): + if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue + v.setncattr(a, din.variables[k].getncattr(a)) + if k == variable: + try: +# if fillValue == None: fillValue = din.variables[k].getncattr('_FillValue') # total kludge + if fillValue == None: fillValue = default_fillvals['f4'] +# print >>sys.stderr, default_fillvals +# v.setncattr('_FillValue', fillValue) # set proper _FillValue for climatology array + v.setncattr('missing_value', fillValue) + print >>sys.stderr, 'Setting missing_value for primary variable %s to %f' % (variable, fillValue) + except: + print >>sys.stderr, 'writeOutNetcdfVars: Warning, for variable %s no fill value specified or derivable from inputs.' % variable + din.close() + dout.close() + return outFile + + +def contourMap(var, variable, coordinates, n, outFile): + figFile = os.path.splitext(outFile)[0] + '_hist.png' + # TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays + vals = var[variable][:] + + # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing?? + imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:], + vmin=-2., vmax=45., outFile=figFile, autoBorders=False, + title='%s %d-day Mean from %s' % (variable.upper(), n, outFile)) + print >>sys.stderr, 'Writing contour plot to %s' % figFile + return figFile + + +def histogram(vals, variable, n, outFile): + figFile = os.path.splitext(outFile)[0] + '_hist.png' + M.clf() +# M.hist(vals, 47, (-2., 45.)) + M.hist(vals, 94) + M.xlim(-5, 45) + M.xlabel('SST in degrees Celsius') + M.ylim(0, 300000) + M.ylabel('Count') + M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile)) + M.show() + print >>sys.stderr, 'Writing histogram plot to %s' % figFile + M.savefig(figFile) + return figFile + + +def dailyFile2date(path, offset=1): + '''Convert YYYYDOY string in filename to date.''' + fn = os.path.split(path)[1] + year = int(fn[offset:offset+4]) + doy = int(fn[offset+5:offset+8]) + return fn[5:15].replace('.', '/') + + +def formatRegion(r): + """Format lat/lon region specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + strs = [str(i).replace('-', 'm') for i in r] + return 'region-%s-%sby%s-%s' % tuple(strs) + + +def formatGrid(r): + """Format lat/lon grid resolution specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + return str(r[0]) + 'by' + str(r[1]) + + +def main(args): + nEpochs = int(args[0]) + nWindow = int(args[1]) + nNeighbors = int(args[2]) + averager = args[3] + optimization = args[4] + mode = args[5] + nWorkers = int(args[6]) + urlFile = args[7] + urls = [s.strip() for s in open(urlFile, 'r')] + if averager == 'gaussInterp': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=GaussInterpConfig, + mode=mode, nWorkers=nWorkers) + elif averager == 'spatialFilter': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=SpatialFilterConfig1, + mode=mode, nWorkers=nWorkers) + elif averager == 'pixelMean': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=PixelMeanConfig, + mode=mode, nWorkers=nWorkers) + else: + print >>sys.stderr, 'climatology2: Error, averager must be one of', AveragingFunctions.keys() + sys.exit(1) + + if results[0][1] is not None: + makeMovie([r[1] for r in results], 'clim.mpg') + return results + + +if __name__ == '__main__': + print main(sys.argv[1:]) + + +# Old Tests: +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_10days.txt +# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_10days.txt + +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_40days.txt +# python climatology2.py 5 5 0 pixelMean fortran multicore 8 urls_sst_40days.txt +# python climatology2.py 5 5 3 gaussInterp fortran multicore 8 urls_sst_40days.txt + +# Old Production: +# python climatology2.py 5 5 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log & +# python climatology2.py 10 10 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log & +# python climatology2.py 5 5 3 gaussInterp fortran multicore 16 urls_sst_2015.txt >& log & + + +# Tests: +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 0 pixelMean fortran multicore 4 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 3 gaussInterp fortran multicore 4 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 urls_sst_daynight_5days_sorted.txt + +# Test number of neighbors needed: +# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_20days_sorted.txt + + +# Production: +# python climatology2.py 5 7 0 pixelMean fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 1 spatialFilter fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology3Spark.py ---------------------------------------------------------------------- diff --git a/climatology/clim/climatology3Spark.py b/climatology/clim/climatology3Spark.py new file mode 100755 index 0000000..14a5f59 --- /dev/null +++ b/climatology/clim/climatology3Spark.py @@ -0,0 +1,418 @@ +""" +climatology3Spark.py + +Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids. + +Simple code to be run on Spark cluster, or using multi-core parallelism on single machine. + +""" + +import sys, os, urlparse, urllib, re, time +import numpy as N +import matplotlib +matplotlib.use('Agg') +import matplotlib.pylab as M + +from variables import getVariables, close +from cache import retrieveFile, CachePath +from split import fixedSplit, splitByNDays +from netCDF4 import Dataset, default_fillvals +from plotlib import imageMap, makeMovie + +from spatialFilter import spatialFilter +from gaussInterp import gaussInterp # calls into Fortran version gaussInterp_f.so +#from gaussInterp_slow import gaussInterp_slow as gaussInterp # pure python, slow debuggable version + +VERBOSE = 1 + +# Possible execution modes +ExecutionModes = ['multicore', 'spark'] + +# SST L3m 4.6km Metadata +# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320 +# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat'] + +# Generate algorithmic name for N-day Climatology product +SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #?? + +# Simple mask and average functions to get us started, then add gaussian interpolation. +# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage +#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0)) + +def qcMask(var, mask): return N.ma.masked_where(mask != 0, var) +#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var) + +def splitModisSst(seq, n): + for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m')): + yield chunk + +AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter} + +PixelMeanConfig = {'name': 'pixelMean'} + +GaussInterpConfig = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 3, 'wlon': 3, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig1a = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 0.30, 'wlon': 0.30, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig1b = {'name': 'gaussInterp', + 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid + 'wlat': 0.08, 'wlon': 0.08, + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +GaussInterpConfig2 = {'name': 'gaussInterp', + 'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25), + 'wlat': 2., 'wlon': 2., + 'slat': 0.15, 'slon': 0.15, 'stime': 1, + 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']} + +FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]] # divide by 16 +FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]] # divide by 9 + +SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16., + 'spatialFilter': N.array(FilterGaussian, dtype=N.int32), + 'missingValue': default_fillvals['f4']} + +SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9., + 'spatialFilter': N.array(FilterLowPass, dtype=N.int32), + 'missingValue': default_fillvals['f4']} + +# Directory to cache retrieved files in +CachePath = '~/cache' + + + +def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year) + nEpochs, # compute a climatology for every N epochs (days) by 'averaging' + nWindow, # number of epochs in window needed for averaging + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + splitFn=splitModisSst, # split function to use to partition the input URL list + maskFn=qcMask, # mask function to compute mask from mask variable + averager='pixelMean', # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter'] + averagingConfig={}, # dict of parameters to control the averaging function (e.g. gaussInterp) + optimization='fortran', # optimization mode (fortran or cython) + mode='multicore', # Map across time periods of N-days for concurrent work, executed by: + # 'multicore' using pysparkling or 'spark' using Spark/Mesos cluster + numNodes=1, # number of cluster nodes to use + nWorkers=4, # number of parallel workers per node + averagingFunctions=AveragingFunctions, # dict of possible averaging functions + legalModes=ExecutionModes, # list of possible execution modes + cachePath=CachePath # directory to cache retrieved files in + ): + '''Compute a climatology every N days by applying a mask and averaging function. +Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + if averagingConfig['name'] == 'gaussInterp': + averagingConfig['wlat'] = nNeighbors + averagingConfig['wlon'] = nNeighbors + try: + averageFn = averagingFunctions[averager] + except: + print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions) + sys.exit(1) + + urlSplits = [s for s in splitFn(urls, nEpochs)] + + if mode == 'multicore': + pass + elif mode == 'spark': + pass + + def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], format='NETCDF4', cachePath=cachePath): + n = len(urls) + if VERBOSE: print >>sys.stderr, urls + + var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn, averagingConfig, optimization, cachePath) + + fn = os.path.split(urls[0])[1] + inFile = os.path.join(cachePath, fn) + method = averagingConfig['name'] + fn = os.path.splitext(fn)[0] + day = fn[5:8] + nDays = int(var['time'][0]) + + if 'wlat' in averagingConfig: + wlat = averagingConfig['wlat'] + else: + wlat = 1 + if int(wlat) == wlat: + outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, method, int(wlat)) # mark each file with first day in period + else: + outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, nDays, method, wlat) # mark each file with first day in period + + outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue, format) + + if plot == 'contour': + figFile = contourMap(var, variable, coordinates, n, outFile) + elif plot == 'histogram': +# figFile = histogram(var, variable, n, outFile) + figFile = None + else: + figFile = None + return (outFile, figFile) + + + return results + + +def accumulateClim(urls, # list of granule URLs for a time period + variable, # name of primary variable in file + mask, # name of mask variable + coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon') + maskFn=qcMask, # mask function to compute mask from mask variable + averageFn=mean, # averaging function to use + averagingConfig={}, # parameters to control averaging function (e.g. gaussInterp) + optimization='fortran', # optimization mode (fortran or cython) + cachePath=CachePath + ): + '''Compute a climatology over N arrays by applying a mask and averaging function. +Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary. +***Assumption: This routine assumes that the N grids will fit in memory.*** + ''' + print >>sys.stderr, 'accumulateClim: Doing %s ...' % averagingConfig['name'] + varList = [variable, mask] + for i, url in enumerate(urls): + try: + path = retrieveFile(url, cachePath) + fn = os.path.split(path)[1] + vtime = int(fn[5:8]) # KLUDGE: extract DOY from filename + except: + print >>sys.stderr, 'climByAveraging: Error, continuing without file %s' % url + continue + if path is None: continue + try: + print >>sys.stderr, 'Reading file %s ...' % path + var, fh = getVariables(path, varList, arrayOnly=True, order='F', set_auto_mask=False) # return dict of variable objects by name + except: + print >>sys.stderr, 'climByAveraging: Error, cannot read file %s' % path + continue + if i == 0: + dtype = var[variable].dtype + if 'int' in dtype.name: dtype = N.float32 + shape = var[variable].shape + vsum = N.ma.empty(shape, dtype, order='F') + vcount = N.ma.empty(shape, dtype, order='F') + emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), order='F') # totally masked variable array for missing or bad file reads + + print >>sys.stderr, 'Read coordinates ...' + var, fh = getVariables(path, coordinates, var, arrayOnly=True, order='F') # read coordinate arrays and add to dict + + var[variable] = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA, turned off masking done by netCDF4 library + + # Echo variable range for sanity check + vals = var[variable].compressed() + print >>sys.stderr, 'Variable Range: min, max:', vals.min(), vals.max() + + if averagingConfig['name'] == 'pixelMean': + vsum += var[variable] # update accumulators + vcount += ~var[mask] + + elif averagingConfig['name'] == 'gaussInterp': + var[variable] = accum + c = averagingConfig + latGrid = c['latGrid']; lonGrid = c['lonGrid'] + if latGrid is not None and lonGrid is not None: + outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], dtype=N.float32, order='F') + outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], dtype=N.float32, order='F') + else: + outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F') + outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F') + varNames = [variable] + coordinates + start = time.time() + avg, weight, status = \ + gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'], + c['slat'], c['slon'], c['stime'], c['vfactor'], c['missingValue'], + VERBOSE, optimization) + end = time.time() + vcount = weight.astype(N.float32) + vsum = avg + print >>sys.stderr, 'gaussInterp execution time:', (end - start) + elif averagingConfig['name'] == 'spatialFilter': + var[variable] = accum + c = averagingConfig + varNames = [variable] + coordinates + start = time.time() + avg, vcount, status = \ + spatialFilter(var, varNames, c['spatialFilter'], c['normalization'], + c['missingValue'], VERBOSE, optimization) + vsum = avg + end = time.time() + print >>sys.stderr, 'spatialFilter execution time:', (end - start) + close(fh) + + return (vcount, vsum) + + +def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue=None, format='NETCDF4'): + '''Construct output bundle of arrays with NetCDF dimensions and attributes. +Output variables and attributes will have same names as the input file. + ''' + din = Dataset(inFile, 'r') + dout = Dataset(outFile, 'w', format=format) + print >>sys.stderr, 'Writing %s ...' % outFile + + # Transfer global attributes from input file + for a in din.ncattrs(): + dout.setncattr(a, din.getncattr(a)) + + # Add dimensions and variables, copying data + coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for coord in coordinates] # here output lon, lat, etc. + for coord in coordinates: + v = dout.createVariable(coord, var['out'+coord].dtype, (coord,)) + v[:] = var['out'+coord][:] + primaryVar = dout.createVariable(variable, var['out'+variable].dtype, coordinates, fill_value=fillValue) + primaryVar[:] = var['out'+variable][:] # transfer array + maskVar = dout.createVariable(mask, 'i1', coordinates) + maskVar[:] = var['out'+mask].astype('i1')[:] + + # Transfer variable attributes from input file + for k,v in dout.variables.iteritems(): + for a in din.variables[k].ncattrs(): + if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue + v.setncattr(a, din.variables[k].getncattr(a)) + if k == variable: + try: +# if fillValue == None: fillValue = din.variables[k].getncattr('_FillValue') # total kludge + if fillValue == None: fillValue = default_fillvals['f4'] +# print >>sys.stderr, default_fillvals +# v.setncattr('_FillValue', fillValue) # set proper _FillValue for climatology array + v.setncattr('missing_value', fillValue) + print >>sys.stderr, 'Setting missing_value for primary variable %s to %f' % (variable, fillValue) + except: + print >>sys.stderr, 'writeOutNetcdfVars: Warning, for variable %s no fill value specified or derivable from inputs.' % variable + din.close() + dout.close() + return outFile + + +def contourMap(var, variable, coordinates, n, outFile): + figFile = os.path.splitext(outFile)[0] + '_hist.png' + # TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays + vals = var[variable][:] + + # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing?? + imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:], + vmin=-2., vmax=45., outFile=figFile, autoBorders=False, + title='%s %d-day Mean from %s' % (variable.upper(), n, outFile)) + print >>sys.stderr, 'Writing contour plot to %s' % figFile + return figFile + + +def histogram(vals, variable, n, outFile): + figFile = os.path.splitext(outFile)[0] + '_hist.png' + M.clf() +# M.hist(vals, 47, (-2., 45.)) + M.hist(vals, 94) + M.xlim(-5, 45) + M.xlabel('SST in degrees Celsius') + M.ylim(0, 300000) + M.ylabel('Count') + M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile)) + M.show() + print >>sys.stderr, 'Writing histogram plot to %s' % figFile + M.savefig(figFile) + return figFile + + +def dailyFile2date(path, offset=1): + '''Convert YYYYDOY string in filename to date.''' + fn = os.path.split(path)[1] + year = int(fn[offset:offset+4]) + doy = int(fn[offset+5:offset+8]) + return fn[5:15].replace('.', '/') + + +def formatRegion(r): + """Format lat/lon region specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + strs = [str(i).replace('-', 'm') for i in r] + return 'region-%s-%sby%s-%s' % tuple(strs) + + +def formatGrid(r): + """Format lat/lon grid resolution specifier as string suitable for file name.""" + if isinstance(r, str): + return r + else: + return str(r[0]) + 'by' + str(r[1]) + + +def main(args): + nEpochs = int(args[0]) + nWindow = int(args[1]) + nNeighbors = int(args[2]) + averager = args[3] + optimization = args[4] + mode = args[5] + nWorkers = int(args[6]) + urlFile = args[7] + urls = [s.strip() for s in open(urlFile, 'r')] + if averager == 'gaussInterp': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=GaussInterpConfig, + mode=mode, nWorkers=nWorkers) + elif averager == 'spatialFilter': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=SpatialFilterConfig1, + mode=mode, nWorkers=nWorkers) + elif averager == 'pixelMean': + results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'], + averager=averager, optimization=optimization, averagingConfig=PixelMeanConfig, + mode=mode, nWorkers=nWorkers) + else: + print >>sys.stderr, 'climatology2: Error, averager must be one of', AveragingFunctions.keys() + sys.exit(1) + + if results[0][1] is not None: + makeMovie([r[1] for r in results], 'clim.mpg') + return results + + +if __name__ == '__main__': + print main(sys.argv[1:]) + + +# Old Tests: +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_10days.txt +# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_10days.txt + +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_40days.txt +# python climatology2.py 5 5 0 pixelMean fortran multicore 8 urls_sst_40days.txt +# python climatology2.py 5 5 3 gaussInterp fortran multicore 8 urls_sst_40days.txt + +# Old Production: +# python climatology2.py 5 5 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log & +# python climatology2.py 10 10 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log & +# python climatology2.py 5 5 3 gaussInterp fortran multicore 16 urls_sst_2015.txt >& log & + + +# Tests: +# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 0 pixelMean fortran multicore 4 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 5 3 gaussInterp fortran multicore 4 urls_sst_daynight_5days_sorted.txt +# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 urls_sst_daynight_5days_sorted.txt + +# Test number of neighbors needed: +# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_20days_sorted.txt + + +# Production: +# python climatology2.py 5 7 0 pixelMean fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt +# python climatology2.py 5 7 1 spatialFilter fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/cluster.py ---------------------------------------------------------------------- diff --git a/climatology/clim/cluster.py b/climatology/clim/cluster.py new file mode 100644 index 0000000..7a76ad9 --- /dev/null +++ b/climatology/clim/cluster.py @@ -0,0 +1,84 @@ +# +# cluster.py -- Parallel map function that uses a distributed cluster and multicore within each node +# + +import os, sys +from split import fixedSplit +import pp +#import dispy + +Servers = ("server-1", "server-2", "server-3", "server-4") +NCores = 8 + + +def splitSeq(seq, n): + '''Split a sequence into N almost even parts.''' + m = int(len(seq)/n) + return fixedSplit(seq, m) + + +def dmap(func, # function to parallel map across split sequence + seq, # sequence of data + nCores, # number of cores to use on each node + servers=[], # list of servers in the cluster + splitterFn=splitSeq, # function to use to split the sequence of data + ): + '''Parallel operations for a cluster of nodes, each with multiple cores. + ''' + nServers = len(servers) + if nServers == 0: servers = ('localhost',) + jobServer = pp.Server(nCores, ppservers=servers) + splits = splitSeq(seq, nServers * nCores) + jobs = [(s, jobServer.submit(func, (s,), ('splitSeq',), globals=globals())) for s in splits] + results = [(j[0], j[1]()) for j in jobs] + return results + + +class PPCluster: + '''Parallel operations for a cluster of nodes, each with multiple cores. + ''' + def __init__(self, nCores=NCores, # number of cores to use on each node + servers=Servers, # list of servers in the cluster + splitterFn=splitSeq, # function to use to split the sequence of data + ): + self.nCores = nCores + self.servers = servers + self.nServers = len(servers) + self.splitterFn = splitterFn + self.jobServer = pp.Server(nCores, ppservers=servers) + + def dmap(self, func, seq): + '''Distributed map function that automatically uses a cluster of nodes and a set number of cores. +A sequence of data is split across the cluster. + ''' + splits = splitSeq(seq, self.nServers * self.nCores) + jobs = [(s, self.jobServer.submit(func, (s,), (splitterFn,), globals=globals())) for s in splits] + results = [(j[0], j[1]()) for j in jobs] + return results + + +# Tests follow. + +def extractDay(url, substring): + '''Extract integer DOY from filename like urls.''' + i = url.find(substring) + return int(url[5:8]) + + +def main(args): + nCores = int(args[0]) + servers = tuple(eval(args[1])) + urlFile = args[2] + urls = [s.strip() for s in open(urlFile, 'r')] + +# results = PPCluster(nCores, servers).dmap(lambda u: extractDOY(u, 'A2015'), urls) + results = dmap(lambda u: extractDOY(u, 'A2015'), urls, nCores, servers, splitSeq) + + +if __name__ == '__main__': + print main(sys.argv[1:]) + +# python cluster.py 8 '["deepdata-1"]' urls_sst_2015.txt +# python cluster.py 1 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt +# python cluster.py 8 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/cluster2.py ---------------------------------------------------------------------- diff --git a/climatology/clim/cluster2.py b/climatology/clim/cluster2.py new file mode 100644 index 0000000..7a76ad9 --- /dev/null +++ b/climatology/clim/cluster2.py @@ -0,0 +1,84 @@ +# +# cluster.py -- Parallel map function that uses a distributed cluster and multicore within each node +# + +import os, sys +from split import fixedSplit +import pp +#import dispy + +Servers = ("server-1", "server-2", "server-3", "server-4") +NCores = 8 + + +def splitSeq(seq, n): + '''Split a sequence into N almost even parts.''' + m = int(len(seq)/n) + return fixedSplit(seq, m) + + +def dmap(func, # function to parallel map across split sequence + seq, # sequence of data + nCores, # number of cores to use on each node + servers=[], # list of servers in the cluster + splitterFn=splitSeq, # function to use to split the sequence of data + ): + '''Parallel operations for a cluster of nodes, each with multiple cores. + ''' + nServers = len(servers) + if nServers == 0: servers = ('localhost',) + jobServer = pp.Server(nCores, ppservers=servers) + splits = splitSeq(seq, nServers * nCores) + jobs = [(s, jobServer.submit(func, (s,), ('splitSeq',), globals=globals())) for s in splits] + results = [(j[0], j[1]()) for j in jobs] + return results + + +class PPCluster: + '''Parallel operations for a cluster of nodes, each with multiple cores. + ''' + def __init__(self, nCores=NCores, # number of cores to use on each node + servers=Servers, # list of servers in the cluster + splitterFn=splitSeq, # function to use to split the sequence of data + ): + self.nCores = nCores + self.servers = servers + self.nServers = len(servers) + self.splitterFn = splitterFn + self.jobServer = pp.Server(nCores, ppservers=servers) + + def dmap(self, func, seq): + '''Distributed map function that automatically uses a cluster of nodes and a set number of cores. +A sequence of data is split across the cluster. + ''' + splits = splitSeq(seq, self.nServers * self.nCores) + jobs = [(s, self.jobServer.submit(func, (s,), (splitterFn,), globals=globals())) for s in splits] + results = [(j[0], j[1]()) for j in jobs] + return results + + +# Tests follow. + +def extractDay(url, substring): + '''Extract integer DOY from filename like urls.''' + i = url.find(substring) + return int(url[5:8]) + + +def main(args): + nCores = int(args[0]) + servers = tuple(eval(args[1])) + urlFile = args[2] + urls = [s.strip() for s in open(urlFile, 'r')] + +# results = PPCluster(nCores, servers).dmap(lambda u: extractDOY(u, 'A2015'), urls) + results = dmap(lambda u: extractDOY(u, 'A2015'), urls, nCores, servers, splitSeq) + + +if __name__ == '__main__': + print main(sys.argv[1:]) + +# python cluster.py 8 '["deepdata-1"]' urls_sst_2015.txt +# python cluster.py 1 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt +# python cluster.py 8 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/datasets.py ---------------------------------------------------------------------- diff --git a/climatology/clim/datasets.py b/climatology/clim/datasets.py new file mode 100644 index 0000000..226935b --- /dev/null +++ b/climatology/clim/datasets.py @@ -0,0 +1,317 @@ +''' +datasets.py -- Routines for dataset-specfic capabilities: file handling, readers, etc. + +One Class for each dataset containing static methods and constants/templates, etc. + +''' + +import sys, os, re, datetime + +import numpy as np + +from split import splitByNDaysKeyed, groupByKeys, extractKeys + + +def splitModisAod(seq, n): + return splitByNDaysKeyed(seq, n, re.compile(r'(....)(..)(..)'), lambda y, m, d: ymd2doy(y, m, d)) + + +def splitAvhrrSst(seq, n): + return splitByNDays_Avhrr(seq, n, re.compile(r'^(....)(..)(..)')) + + +class ModisSst: + ExpectedRunTime = "28m" + UrlsPath = "/data/share/datasets/MODIS_L3_AQUA_11UM_V2014.0_4KM_DAILY/daily_data/A*SST*.nc" + ExampleFileName = 'A2010303.L3m_DAY_NSST_sst_4km.nc' + GetKeysRegex = r'A(....)(...).L3m_DAY_(.)S' + + VariableName = 'sst' + Mask = None + Coordinates = ['lat', 'lon'] + + OutputClimTemplate = '' + + @staticmethod + def keysTransformer(s): return (s[1], s[0], s[2]) # DOY, YEAR, N=night / S=day + + @staticmethod + def getKeys(url): + return extractKeys(url, ModisSst.GetKeysRegex, ModisSst.keysTransformer) + + @staticmethod + def split(seq, n): + return [u for u in splitByNDaysKeyed(seq, n, ModisSst.GetKeysRegex, ModisSst.keysTransformer)] + + @staticmethod + def genOutputName(doy, variable, nEpochs, averagingConfig): + return 'A%03d.L3m_%s_%dday_clim_%s.nc' % ( + doy, variable, nEpochs, averagingConfig['name']) # mark each file with first day in period + + +class ModisChlor: + ExpectedRunTime = "11m" + UrlsPath = "/Users/greguska/githubprojects/nexus/nexus-ingest/developer-box/data/modis_aqua_chl/A*chlor*.nc" + ExampleFileName = "A2013187.L3m_DAY_CHL_chlor_a_4km.nc" + GetKeysRegex = r'A(....)(...).L3m.*CHL' + + Variable = 'chlor_a' + Mask = None + Coordinates = ['lat', 'lon'] + + OutputClimTemplate = '' + + @staticmethod + def keysTransformer(s): return (s[1], s[0]) # DOY, YEAR + + @staticmethod + def getKeys(url): + return extractKeys(url, ModisChlor.GetKeysRegex, ModisChlor.keysTransformer) + + @staticmethod + def split(seq, n): + return [u for u in splitByNDaysKeyed(seq, n, ModisChlor.GetKeysRegex, ModisChlor.keysTransformer)] + + @staticmethod + def genOutputName(doy, variable, nEpochs, averagingConfig): + return 'A%03d.L3m_%s_%dday_clim_%s.nc' % ( + doy, variable, nEpochs, averagingConfig['name']) # mark each file with first day in period + + +class MeasuresSsh: + ExpectedRunTime = "2m22s" + UrlsPath = "/data/share/datasets/MEASURES_SLA_JPL_1603/daily_data/ssh_grids_v1609*12.nc" + ExampleFileName = "ssh_grids_v1609_2006120812.nc" + GetKeysRegex = r'ssh.*v1609_(....)(..)(..)12.nc' + + Variable = 'SLA' # sea level anomaly estimate + Mask = None + Coordinates = ['Longitude', 'Latitude'] # Time is first (len=1) coordinate, will be removed + + OutputClimTemplate = '' + + @staticmethod + def keysTransformer(s): return (ymd2doy(s[0], s[1], s[2]), s[0]) # DOY, YEAR + + @staticmethod + def getKeys(url): + return extractKeys(url, MeasuresSsh.GetKeysRegex, MeasuresSsh.keysTransformer) + + @staticmethod + def split(seq, n): + return [u for u in splitByNDaysKeyed(seq, n, MeasuresSsh.GetKeysRegex, MeasuresSsh.keysTransformer)] + + @staticmethod + def genOutputName(doy, variable, nEpochs, averagingConfig): + return "ssh_grids_v1609_%03d_%dday_clim_%s.nc" % (int(doy), nEpochs, averagingConfig['name']) + + +class CCMPWind: + ExpectedRunTime = "?" + UrlsPath = "/data/share/datasets/CCMP_V2.0_L3.0/daily_data/CCMP_Wind*_V02.0_L3.0_RSS_uncompressed.nc" + ExampleFileName = "CCMP_Wind_Analysis_20160522_V02.0_L3.0_RSS_uncompressed.nc" + GetKeysRegex = r'CCMP_Wind_Analysis_(....)(..)(..)_V.*.nc' + + Variable = 'Wind_Magnitude' # to be computed as sqrt(uwnd^2 + vwnd^2) + Mask = None + Coordinates = ['latitude', 'longitude'] + + OutputClimTemplate = '' + + @staticmethod + def keysTransformer(s): + return (ymd2doy(s[0], s[1], s[2]), s[0]) # DOY, YEAR + + @staticmethod + def getKeys(url): + return extractKeys(url, CCMPWind.GetKeysRegex, CCMPWind.keysTransformer) + + @staticmethod + def split(seq, n): + return [u for u in splitByNDaysKeyed(seq, n, CCMPWind.GetKeysRegex, CCMPWind.keysTransformer)] + + @staticmethod + def genOutputName(doy, variable, nEpochs, averagingConfig): + return "CCMP_Wind_Analysis_V02.0_L3.0_RSS_%03d_%dday_clim_%s.nc" % (int(doy), nEpochs, averagingConfig['name']) + + @staticmethod + def readAndMask(url, variable, mask=None, cachePath='/tmp/cache', hdfsPath=None): + """ + Read a variable from a netCDF or HDF file and return a numpy masked array. + If the URL is remote or HDFS, first retrieve the file into a cache directory. + """ + from variables import getVariables, close + v = None + if mask: + variables = [variable, mask] + else: + variables = [variable] + try: + from cache import retrieveFile + path = retrieveFile(url, cachePath, hdfsPath) + except: + print >> sys.stderr, 'readAndMask: Error, continuing without file %s' % url + return v + + if CCMPWind.Variable in variables: + var, fh = getVariables(path, ['uwnd','vwnd'], arrayOnly=True, + set_auto_mask=True) # return dict of variable objects by name + uwnd_avg = np.average(var['uwnd'], axis=0) + vwnd_avg = np.average(var['vwnd'], axis=0) + wind_magnitude = np.sqrt(np.add(np.multiply(uwnd_avg, uwnd_avg), np.multiply(vwnd_avg, vwnd_avg))) + v = wind_magnitude + if v.shape[0] == 1: v = v[0] # throw away trivial time dimension for CF-style files + close(fh) + else: + try: + print >> sys.stderr, 'Reading variable %s from %s' % (variable, path) + var, fh = getVariables(path, variables, arrayOnly=True, + set_auto_mask=True) # return dict of variable objects by name + v = var[ + variable] # could be masked array + if v.shape[0] == 1: v = v[0] # throw away trivial time dimension for CF-style files + close(fh) + except: + print >> sys.stderr, 'readAndMask: Error, cannot read variable %s from file %s' % (variable, path) + + return v + +class MonthlyClimDataset: + ExpectedRunTime = "2m" + UrlsPath = '' + ExampleFileName = '' + GetKeysRegex = r'(YYYY)(MM)(DD)' # Regex to extract year, month, day + Variable = 'var' # Variable name in granule + Mask = None + Coordinates = ['lat', 'lon'] + OutputClimTemplate = '' + + @staticmethod + def keysTransformer(s): + return (s[1],s[0]) # MONTH, YEAR + + @staticmethod + def getKeys(url): + return extractKeys(url, MonthlyClimDataset.GetKeysRegex, + MonthlyClimDataset.keysTransformer) + + @staticmethod + def split(seq, n): + return [u for u in splitByNDaysKeyed(seq, n, + MonthlyClimDataset.GetKeysRegex, + MonthlyClimDataset.keysTransformer)] + + @staticmethod + def genOutputName(month, variable, nEpochs, averagingConfig): + # Here we use the 15th of the month to get DOY and just use any + # non-leap year. + doy = datetime2doy(ymd2datetime(2017, month, 15)) + return 'monthly_clim_%s_%03d_month%02d_nepochs%d_%s.nc' % ( + variable, doy, month, nEpochs, + averagingConfig['name']) # mark each file with month + + +class SMAP_L3M_SSS(MonthlyClimDataset): + UrlsPath = "/data/share/datasets/SMAP_L3_SSS/monthly/RSS_smap_SSS_monthly_*.nc" + ExampleFileName = 'RSS_smap_SSS_monthly_2015_04_v02.0.nc' + GetKeysRegex = r'RSS_smap_SSS_monthly_(....)_(..)_v02' + Variable = 'sss_smap' + + @staticmethod + def getKeys(url): + return extractKeys(url, SMAP_L3M_SSS.GetKeysRegex, + SMAP_L3M_SSS.keysTransformer) + + @staticmethod + def split(seq, n): + return [u for u in splitByNDaysKeyed(seq, n, + SMAP_L3M_SSS.GetKeysRegex, + SMAP_L3M_SSS.keysTransformer)] + + @staticmethod + def genOutputName(month, variable, nEpochs, averagingConfig): + # Here we use the 15th of the month to get DOY and just use any + # non-leap year. + doy = datetime2doy(ymd2datetime(2017, month, 15)) + return '%s_L3m_clim_doy%03d_month%02d_nepochs%d_%s.nc' % ( + variable, doy, month, nEpochs, + averagingConfig['name']) # mark each file with month + +class GRACE_Tellus(MonthlyClimDataset): + GetKeysRegex = r'GRCTellus.JPL.(....)(..)(..).GLO' + Variable = 'lwe_thickness' # Liquid_Water_Equivalent_Thickness + + @staticmethod + def getKeys(url): + return extractKeys(url, GRACE_Tellus.GetKeysRegex, + GRACE_Tellus.keysTransformer) + + @staticmethod + def split(seq, n): + return [u for u in splitByNDaysKeyed(seq, n, + GRACE_Tellus.GetKeysRegex, + GRACE_Tellus.keysTransformer)] + + @staticmethod + def genOutputName(month, variable, nEpochs, averagingConfig): + # Here we use the 15th of the month to get DOY and just use any + # non-leap year. + doy = datetime2doy(ymd2datetime(2017, month, 15)) + return 'GRACE_Tellus_monthly_%s_%03d_month%02d_nepochs%d_%s.nc' % ( + variable, doy, month, nEpochs, + averagingConfig['name']) # mark each file with month + +class GRACE_Tellus_monthly_land(GRACE_Tellus): + UrlsPath = "/data/share/datasets/GRACE_Tellus/monthly_land/GRCTellus.JPL.*.nc" + ExampleFileName = "GRCTellus.JPL.20150122.GLO.RL05M_1.MSCNv02CRIv02.land.nc" + + @staticmethod + def genOutputName(month, variable, nEpochs, averagingConfig): + # Here we use the 15th of the month to get DOY and just use any + # non-leap year. + doy = datetime2doy(ymd2datetime(2017, month, 15)) + return 'GRACE_Tellus_monthly_land_%s_%03d_month%02d_nepochs%d_%s.nc' % ( + variable, doy, month, nEpochs, + averagingConfig['name']) # mark each file with month + +class GRACE_Tellus_monthly_ocean(GRACE_Tellus): + UrlsPath = "/data/share/datasets/GRACE_Tellus/monthly_ocean/GRCTellus.JPL.*.nc" + ExampleFileName = "GRCTellus.JPL.20150122.GLO.RL05M_1.MSCNv02CRIv02.ocean.nc" + + @staticmethod + def genOutputName(month, variable, nEpochs, averagingConfig): + # Here we use the 15th of the month to get DOY and just use any + # non-leap year. + doy = datetime2doy(ymd2datetime(2017, month, 15)) + return 'GRACE_Tellus_monthly_ocean_%s_%03d_month%02d_nepochs%d_%s.nc'%( + variable, doy, month, nEpochs, + averagingConfig['name']) # mark each file with month + + +DatasetList = {'ModisSst': ModisSst, 'ModisChlor': ModisChlor, + 'MeasuresSsh': MeasuresSsh, 'CCMPWind': CCMPWind, + 'SMAP_L3M_SSS': SMAP_L3M_SSS, + 'GRACE_Tellus_monthly_ocean': GRACE_Tellus_monthly_ocean, + 'GRACE_Tellus_monthly_land': GRACE_Tellus_monthly_land} + + +# Utils follow. +def ymd2doy(year, mon, day): + return datetime2doy(ymd2datetime(year, mon, day)) + + +def ymd2datetime(y, m, d): + y, m, d = map(int, (y, m, d)) + return datetime.datetime(y, m, d) + + +def datetime2doy(dt): + return int(dt.strftime('%j')) + + +def doy2datetime(year, doy): + '''Convert year and DOY (day of year) to datetime object.''' + return datetime.datetime(int(year), 1, 1) + datetime.timedelta(int(doy) - 1) + + +def doy2month(year, doy): return doy2datetime(year, doy).strftime('%m') http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/dparkTest.py ---------------------------------------------------------------------- diff --git a/climatology/clim/dparkTest.py b/climatology/clim/dparkTest.py new file mode 100644 index 0000000..be92cbd --- /dev/null +++ b/climatology/clim/dparkTest.py @@ -0,0 +1,9 @@ + +import dpark + +lines = dpark.textFile("/usr/share/dict/words", 128) +#lines = dpark.textFile("/usr/share/doc/gcc-4.4.7/README.Portability", 128) +words = lines.flatMap(lambda x:x.split()).map(lambda x:(x,1)) +wc = words.reduceByKey(lambda x,y:x+y).collectAsMap() +print wc[wc.keys[0]] + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/gaussInterp.py ---------------------------------------------------------------------- diff --git a/climatology/clim/gaussInterp.py b/climatology/clim/gaussInterp.py new file mode 100644 index 0000000..f694d7a --- /dev/null +++ b/climatology/clim/gaussInterp.py @@ -0,0 +1,43 @@ +# +# gaussInterp routine -- Gaussian weighted smoothing in lat, lon, and time +# +# Based on Ed Armstrong's routines. +# +# Calls into optimized routines in Fortran or cython. +# See gaussInterp_f.f or gaussInterp.pyx +# + +import numpy as N, time +from gaussInterp_f import gaussinterp_f + + +def gaussInterp(var, # bundle of input arrays: masked variable, coordinates + varNames, # list of names in order: primary variable, coordinates in order lat, lon, time + outlat, outlon, # output lat/lon coordinate vectors + wlat, wlon, # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees) + slat, slon, stime, # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days) + vfactor=-0.6931, # factor in front of gaussian expression + missingValue=-9999., # value to mark missing values in interp result + verbose=1, # integer to set verbosity level + optimization='fortran'): # Mode of optimization, using 'fortran' or 'cython' + '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time. +Bundle of arrays (var) contains a 3D masked variable and coordinate arrays for lat, lon, and time read from netdf/hdf files. +Returns the 2D interpolated variable (masked) and a status for failures. + ''' + v = var[varNames[0]][:] # real*4 in Fortran code, is v.dtype correct? + vmask = N.ma.getmask(v).astype('int8')[:] # integer*1, convert bool mask to one-byte integer for Fortran + vtime = var[varNames[1]][:] # integer*4 in Fortran + lat = var[varNames[2]][:] # real*4 + lon = var[varNames[3]][:] # real*4 + if optimization == 'fortran': + vinterp, vweight, status = \ + gaussinterp_f(v, vmask, vtime, lat, lon, + outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue, verbose) + else: + vinterp, vweight, status = \ + gaussInterp_(v, vmask, vtime, lat, lon, + outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue, verbose) + + vinterp = N.ma.masked_where(vweight == 0.0, vinterp) + return (vinterp, vweight, status) + http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/gaussInterp.pyx ---------------------------------------------------------------------- diff --git a/climatology/clim/gaussInterp.pyx b/climatology/clim/gaussInterp.pyx new file mode 100644 index 0000000..943e298 --- /dev/null +++ b/climatology/clim/gaussInterp.pyx @@ -0,0 +1,131 @@ +c +c gaussInterp routine -- Gaussian weighted smoothing in lat, lon, and time +c +c Based on Ed Armstrong's routines. Designed to be called from python using f2py. +c +c +c Gaussian weighting = exp( vfactor * (((x - x0)/sx)^2 + ((y - y0)/sy)^2 + ((t - t0)/st)^2 )) +c +c where deltas are distances in lat, lon and time and sx, sy, st are one e-folding sigmas. +c +c Cutoffs for neighbors allowed in the interpolation are set by distance in lat/lon (see dlat/dlon); +c for time all epochs are included. +c + +import sys +import numpy as np +cimport numpy as np +from math import exp + +from gaussInterp_f import gaussInterp_f + +Var_t = np.float +ctypedef np.float_t Var_t + +VERBOSE = 1 + + +def gaussInterp(var, # bundle of input arrays: masked variable, coordinates + varNames, # list of names in order: primary variable, coordinates in order lat, lon, time + outlat, outlon, # output lat/lon coordinate vectors + wlat, wlon, # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees) + slat, slon, stime, # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days) + vfactor=-0.6931, # factor in front of gaussian expression + missingValue=-9999., # value to mark missing values in interp result + verbose=VERBOSE, # integer to set verbosity level + optimization='cython'): # Mode of optimization, using 'fortran' or 'cython' + '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time. +Bundle of arrays (var) contains a 3D masked variable and coordinate arrays for lat, lon, and time read from netdf/hdf files. +Returns the 2D interpolated variable (masked) and a status for failures. + ''' + v = var[varNames[0]][:] + vmask = np.ma.getmask(v)[:] + vtime = var[varNames[1]][:] + lat = var[varNames[2]][:] + lon = var[varNames[3]][:] + if optimization == 'fortran': + vinterp, vweight, status = + gaussInterp_f(v, vmask, vtime, lat, lon, + outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue) + else: + vinterp, vweight, status = + gaussInterp_(v, vmask, vtime, lat, lon, + outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue) + vinterp = np.ma.array(vinterp, mask=np.ma.make_mask(vweight)) # apply mask + return (vinterp, vweight, status) + + +#@cython.boundscheck(False) +def gaussInterp_(np.ndarray[Var_t, ndim=3] var, # variable & mask arrays with dimensions of lat,lon,time + np.ndarray[Var_t, ndim=3] vmask, + np.ndarray[np.int_t, ndim=1] vtime, + np.ndarray[np.float_t, ndim=1] lat, + np.ndarray[np.float_t, ndim=1] lon, # coordinate vectors for inputs + np.ndarray[np.float_t, ndim=1] outlat, # coordinate vectors for grid to interpolate to + np.ndarray[np.float_t, ndim=1] outlon, + float wlat, float wlon, # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees) + float slat, float slon, float stime, # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days) + float vfactor, # factor in front of gaussian expression + float missingValue): # value to mark missing values in interp result + '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time. +Returns the 2D interpolated variable (masked), the weight array, and a status for failures. + ''' + assert var.dtype == Var_t and mask.dtype == np.int + + cdef np.ndarray[Var_t, ndim=2] vinterp = np.zeros( (outlat.shape[0], outlon.shape[0]), dtype=Var_t ) # interpolated variable, missing values not counted + cdef np.ndarray[Var_t, ndim=2] vweight = np.zeros( (outlat.shape[0], outlon.shape[0]), dtype=Var_t ) # weight of values interpolated (can be zero) + cdef int status = 0 # negative status indicates error + + cdef int imin, imax, jmin, jmax + cdef int iin, jin, kin + cdef int i, j + + cdef int ntime = time.shape[0] + cdef int nlat = lat.shape[0] + cdef int nlon = lon.shape[0] + + cdef int noutlat = outlat.shape[0] + cdef int noutlon = outlon.shape[0] + + cdef float wlat2 = wlat / 2. + cdef float wlon2 = wlon / 2. + cdef float lat0 = lat[0] + cdef float lon0 = lon[0] + cdef float dlat = lat[1] - lat[0] + cdef float dlon = lon[1] - lon[0] + cdef double midTime = time[int(ntime/2 + 0.5)] + + for i xrange(noutlat): + print >>sys.stderr, outlat[i] + for j in xrange(noutlon): + imin = clamp(int((outlat[i] - wlat2 - lat0)/dlat + 0.5), 0, nlat-1) + imax = clamp(int((outlat[i] + wlat2 - lat0)/dlat + 0.5), 0, nlat-1) + jmin = clamp(int((outlon[j] - wlon2 - lon0)/dlon + 0.5), 0, nlon-1) + jmax = clamp(int((outlon[j] + wlon2 - lon0)/dlon + 0.5), 0, nlon-1) + + for kin in xrange(ntime): + for iin in xrange(imin, imax+1): + for jin in xrange(jmin, jmax+1): + if not vmask[kin, iin, jin]: + fac = exp( vfactor * + (((outlat[i] - lat[iin])/slat)**2 + + ((outlon[j] - lon[jin])/slon)**2 + + ((midTime - vtime[kin])/stime)**2)) + val = var[kin, iin, jin] + if VERBOSE > 1: print >>sys.stderr, kin, iin, jin, vtime[kin], lat[iin], lon[jin], val, fac, val*fac + + vinterp[i,j] = vinterp[i,j] + val * fac + vweight[i,j] = vweight[i,j] + fac + + if vweight[i,j] != 0.0: + vinterp[i,j] = vinterp[i,j] / vweight[i,j] + else: + vinterp[i,j] = missingValue + + return (vinterp, vweight, status) + + +cdef int clamp(int i, int n, int m): + if i < n: return n + if i > m: return m + return i
