This is an automated email from the ASF dual-hosted git repository.
fgreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 12670b1 updated to return iso times and mean (#33)
12670b1 is described below
commit 12670b1a69305beb52d8274911f3b5457f727bcc
Author: fgreg <[email protected]>
AuthorDate: Tue Sep 18 10:40:17 2018 -0700
updated to return iso times and mean (#33)
---
.../webservice/algorithms_spark/TimeAvgMapSpark.py | 264 ++++++++++++---------
1 file changed, 157 insertions(+), 107 deletions(-)
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 19de786..473f4ce 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -15,131 +15,152 @@
import logging
+from datetime import datetime
import numpy as np
+import shapely.geometry
from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
-# from time import time
-from webservice.NexusHandler import nexus_handler, SparkHandler,
DEFAULT_PARAMETERS_SPEC
+from webservice.NexusHandler import nexus_handler, SparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException,
NoDataException
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
@nexus_handler
class TimeAvgMapSparkHandlerImpl(SparkHandler):
name = "Time Average Map Spark"
path = "/timeAvgMapSpark"
description = "Computes a Latitude/Longitude Time Average plot given an
arbitrary geographical area and time range"
- params = DEFAULT_PARAMETERS_SPEC
+ params = {
+ "ds": {
+ "name": "Dataset",
+ "type": "String",
+ "description": "The dataset used to generate the map. Required"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or
seconds since EPOCH. Required"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or
seconds since EPOCH. Required"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern)
Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern)
Latitude. Required"
+ },
+ "spark": {
+ "name": "Spark Configuration",
+ "type": "comma-delimited value",
+ "description": "Configuration used to launch in the Spark cluster.
Value should be 3 elements separated by "
+ "commas. 1) Spark Master 2) Number of Spark
Executors 3) Number of Spark Partitions. Only "
+ "Number of Spark Partitions is used by this
function. Optional (Default: local,1,1)"
+ }
+ }
singleton = True
def __init__(self):
SparkHandler.__init__(self)
self.log = logging.getLogger(__name__)
- # self.log.setLevel(logging.DEBUG)
- @staticmethod
- def _map(tile_in_spark):
- tile_bounds = tile_in_spark[0]
- (min_lat, max_lat, min_lon, max_lon,
- min_y, max_y, min_x, max_x) = tile_bounds
- startTime = tile_in_spark[1]
- endTime = tile_in_spark[2]
- ds = tile_in_spark[3]
- tile_service = NexusTileService()
- # print 'Started tile {0}'.format(tile_bounds)
- # sys.stdout.flush()
- tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
- # days_at_a_time = 90
- days_at_a_time = 30
- # days_at_a_time = 7
- # days_at_a_time = 1
- # print 'days_at_a_time = {0}'.format(days_at_a_time)
- t_incr = 86400 * days_at_a_time
- sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
- cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
- t_start = startTime
- while t_start <= endTime:
- t_end = min(t_start + t_incr, endTime)
- # t1 = time()
- # print 'nexus call start at time {0}'.format(t1)
- # sys.stdout.flush()
- # nexus_tiles = \
- # TimeAvgMapSparkHandlerImpl.query_by_parts(tile_service,
- # min_lat, max_lat,
- # min_lon, max_lon,
- # ds,
- # t_start,
- # t_end,
- # part_dim=2)
- nexus_tiles = \
- tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
- min_lon, max_lon,
- ds=ds,
- start_time=t_start,
- end_time=t_end)
- # t2 = time()
- # print 'nexus call end at time %f' % t2
- # print 'secs in nexus call: ', t2 - t1
- # print 't %d to %d - Got %d tiles' % (t_start, t_end,
- # len(nexus_tiles))
- # for nt in nexus_tiles:
- # print nt.granule
- # print nt.section_spec
- # print 'lat min/max:', np.ma.min(nt.latitudes),
np.ma.max(nt.latitudes)
- # print 'lon min/max:', np.ma.min(nt.longitudes),
np.ma.max(nt.longitudes)
- # sys.stdout.flush()
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
- for tile in nexus_tiles:
- tile.data.data[:, :] = np.nan_to_num(tile.data.data)
- sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
- cnt_tile += (~tile.data.mask[0,
- min_y:max_y + 1,
- min_x:max_x + 1]).astype(np.uint8)
- t_start = t_end + 1
+ try:
+ ds = request.get_dataset()
+ if type(ds) == list or type(ds) == tuple:
+ ds = next(iter(ds))
+ except:
+ raise NexusProcessingException(
+ reason="'ds' argument is required. Must be a string",
+ code=400)
+
+ # Do not allow time series on Climatology
+ if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+ raise NexusProcessingException(
+ reason="Cannot compute Latitude/Longitude Time Average plot on
a climatology", code=400)
+
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ request.get_min_lon = lambda: bounding_polygon.bounds[0]
+ request.get_min_lat = lambda: bounding_polygon.bounds[1]
+ request.get_max_lon = lambda: bounding_polygon.bounds[2]
+ request.get_max_lat = lambda: bounding_polygon.bounds[3]
+ except:
+ try:
+ west, south, east, north = request.get_min_lon(),
request.get_min_lat(), \
+ request.get_max_lon(),
request.get_max_lat()
+ bounding_polygon = shapely.geometry.Polygon(
+ [(west, south), (east, south), (east, north), (west,
north), (west, south)])
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited
float formatted as "
+ "Minimum (Western) Longitude, Minimum (Southern)
Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern)
Latitude",
+ code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value
seconds from epoch or "
+ "string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value
seconds from epoch or "
+ "string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time.
Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601),
request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+ start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+ end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+ return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, \
+ spark_master, spark_nexecs, spark_nparts
- # print 'cnt_tile = ', cnt_tile
- # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
- # sum_tile.mask = cnt_tile.mask
- # avg_tile = sum_tile / cnt_tile
- # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt':
cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in
range(tile_inbounds_shape[0])]
- # print 'Finished tile', tile_bounds
- # print 'Tile avg = ', avg_tile
- # sys.stdout.flush()
- return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
-
- def calc(self, computeOptions, **args):
+ def calc(self, compute_options, **args):
"""
- :param computeOptions: StatsComputeOptions
+ :param compute_options: StatsComputeOptions
:param args: dict
:return:
"""
- spark_master, spark_nexecs, spark_nparts =
computeOptions.get_spark_cfg()
- self._setQueryParams(computeOptions.get_dataset()[0],
- (float(computeOptions.get_min_lat()),
- float(computeOptions.get_max_lat()),
- float(computeOptions.get_min_lon()),
- float(computeOptions.get_max_lon())),
- computeOptions.get_start_time(),
- computeOptions.get_end_time(),
+ ds, bbox, start_time, end_time, spark_master, spark_nexecs,
spark_nparts = self.parse_arguments(compute_options)
+
+ compute_options.get_spark_cfg()
+
+ self._setQueryParams(ds,
+ (float(bbox.bounds[1]),
+ float(bbox.bounds[3]),
+ float(bbox.bounds[0]),
+ float(bbox.bounds[2])),
+ start_time,
+ end_time,
spark_master=spark_master,
spark_nexecs=spark_nexecs,
spark_nparts=spark_nparts)
- if 'CLIM' in self._ds:
- raise NexusProcessingException(
- reason="Cannot compute Latitude/Longitude Time Average plot on
a climatology", code=400)
-
nexus_tiles = self._find_global_tile_set()
- # print 'tiles:'
- # for tile in nexus_tiles:
- # print tile.granule
- # print tile.section_spec
- # print 'lat:', tile.latitudes
- # print 'lon:', tile.longitudes
-
- # nexus_tiles)
+
if len(nexus_tiles) == 0:
raise NoDataException(reason="No data found for selected
timeframe")
@@ -152,14 +173,11 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
self._maxLonCent))
- # for tile in nexus_tiles:
- # print 'lats: ', tile.latitudes.compressed()
- # print 'lons: ', tile.longitudes.compressed()
# Create array of tuples to pass to Spark map function
nexus_tiles_spark = [[self._find_tile_bounds(t),
self._startTime, self._endTime,
self._ds] for t in nexus_tiles]
- # print 'nexus_tiles_spark = ', nexus_tiles_spark
+
# Remove empty tiles (should have bounds set to None)
bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
for i in np.flipud(bad_tile_inds):
@@ -168,7 +186,7 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
# Expand Spark map tuple array by duplicating each entry N times,
# where N is the number of ways we want the time dimension carved up.
num_time_parts = 72
- # num_time_parts = 1
+
nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts,
axis=0)
self.log.debug('repeated len(nexus_tiles_spark) =
{0}'.format(len(nexus_tiles_spark)))
@@ -183,9 +201,6 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
len(nexus_tiles_spark) / num_time_parts,
axis=0).reshape((len(nexus_tiles_spark), 2))
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
- # print 'nexus_tiles_spark final = '
- # for i in range(len(nexus_tiles_spark)):
- # print nexus_tiles_spark[i]
# Launch Spark computations
rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
@@ -246,13 +261,48 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
# Create dict for JSON response
- results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+ results = [[{'mean': a[y, x], 'cnt': int(n[y, x]),
'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
for x in range(a.shape[1])] for y in range(a.shape[0])]
- return TimeAvgMapSparkResults(results=results, meta={},
computeOptions=computeOptions)
+ return NexusResults(results=results, meta={}, stats=None,
+ computeOptions=None, minLat=bbox.bounds[1],
+ maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
+ maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
+ endTime=end_time)
+
+ @staticmethod
+ def _map(tile_in_spark):
+ tile_bounds = tile_in_spark[0]
+ (min_lat, max_lat, min_lon, max_lon,
+ min_y, max_y, min_x, max_x) = tile_bounds
+ startTime = tile_in_spark[1]
+ endTime = tile_in_spark[2]
+ ds = tile_in_spark[3]
+ tile_service = NexusTileService()
+
+ tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+
+ days_at_a_time = 30
+
+ t_incr = 86400 * days_at_a_time
+ sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+ cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+ t_start = startTime
+ while t_start <= endTime:
+ t_end = min(t_start + t_incr, endTime)
+
+ nexus_tiles = \
+ tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+ min_lon, max_lon,
+ ds=ds,
+ start_time=t_start,
+ end_time=t_end)
+ for tile in nexus_tiles:
+ tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+ sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+ cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x +
1]).astype(np.uint8)
+ t_start = t_end + 1
-class TimeAvgMapSparkResults(NexusResults):
- def __init__(self, results=None, meta=None, computeOptions=None):
- NexusResults.__init__(self, results=results, meta=meta, stats=None,
computeOptions=computeOptions)
+ return (min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)