This is an automated email from the ASF dual-hosted git repository.

fgreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 12670b1  updated to return iso times and mean (#33)
12670b1 is described below

commit 12670b1a69305beb52d8274911f3b5457f727bcc
Author: fgreg <[email protected]>
AuthorDate: Tue Sep 18 10:40:17 2018 -0700

    updated to return iso times and mean (#33)
---
 .../webservice/algorithms_spark/TimeAvgMapSpark.py | 264 ++++++++++++---------
 1 file changed, 157 insertions(+), 107 deletions(-)

diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py 
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 19de786..473f4ce 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -15,131 +15,152 @@
 
 
 import logging
+from datetime import datetime
 
 import numpy as np
+import shapely.geometry
 from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
 
-# from time import time
-from webservice.NexusHandler import nexus_handler, SparkHandler, 
DEFAULT_PARAMETERS_SPEC
+from webservice.NexusHandler import nexus_handler, SparkHandler
 from webservice.webmodel import NexusResults, NexusProcessingException, 
NoDataException
 
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
 
 @nexus_handler
 class TimeAvgMapSparkHandlerImpl(SparkHandler):
     name = "Time Average Map Spark"
     path = "/timeAvgMapSpark"
     description = "Computes a Latitude/Longitude Time Average plot given an 
arbitrary geographical area and time range"
-    params = DEFAULT_PARAMETERS_SPEC
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "String",
+            "description": "The dataset used to generate the map. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude. Required"
+        },
+        "spark": {
+            "name": "Spark Configuration",
+            "type": "comma-delimited value",
+            "description": "Configuration used to launch in the Spark cluster. 
Value should be 3 elements separated by "
+                           "commas. 1) Spark Master 2) Number of Spark 
Executors 3) Number of Spark Partitions. Only "
+                           "Number of Spark Partitions is used by this 
function. Optional (Default: local,1,1)"
+        }
+    }
     singleton = True
 
     def __init__(self):
         SparkHandler.__init__(self)
         self.log = logging.getLogger(__name__)
-        # self.log.setLevel(logging.DEBUG)
 
-    @staticmethod
-    def _map(tile_in_spark):
-        tile_bounds = tile_in_spark[0]
-        (min_lat, max_lat, min_lon, max_lon,
-         min_y, max_y, min_x, max_x) = tile_bounds
-        startTime = tile_in_spark[1]
-        endTime = tile_in_spark[2]
-        ds = tile_in_spark[3]
-        tile_service = NexusTileService()
-        # print 'Started tile {0}'.format(tile_bounds)
-        # sys.stdout.flush()
-        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
-        # days_at_a_time = 90
-        days_at_a_time = 30
-        # days_at_a_time = 7
-        # days_at_a_time = 1
-        # print 'days_at_a_time = {0}'.format(days_at_a_time)
-        t_incr = 86400 * days_at_a_time
-        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
-        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
-        t_start = startTime
-        while t_start <= endTime:
-            t_end = min(t_start + t_incr, endTime)
-            # t1 = time()
-            # print 'nexus call start at time {0}'.format(t1)
-            # sys.stdout.flush()
-            # nexus_tiles = \
-            #    TimeAvgMapSparkHandlerImpl.query_by_parts(tile_service,
-            #                                              min_lat, max_lat, 
-            #                                              min_lon, max_lon, 
-            #                                              ds, 
-            #                                              t_start, 
-            #                                              t_end,
-            #                                              part_dim=2)
-            nexus_tiles = \
-                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
-                                                      min_lon, max_lon,
-                                                      ds=ds,
-                                                      start_time=t_start,
-                                                      end_time=t_end)
-            # t2 = time()
-            # print 'nexus call end at time %f' % t2
-            # print 'secs in nexus call: ', t2 - t1
-            # print 't %d to %d - Got %d tiles' % (t_start, t_end,
-            #                                     len(nexus_tiles))
-            # for nt in nexus_tiles:
-            #    print nt.granule
-            #    print nt.section_spec
-            #    print 'lat min/max:', np.ma.min(nt.latitudes), 
np.ma.max(nt.latitudes)
-            #    print 'lon min/max:', np.ma.min(nt.longitudes), 
np.ma.max(nt.longitudes)
-            # sys.stdout.flush()
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
 
-            for tile in nexus_tiles:
-                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
-                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
-                cnt_tile += (~tile.data.mask[0,
-                              min_y:max_y + 1,
-                              min_x:max_x + 1]).astype(np.uint8)
-            t_start = t_end + 1
+        try:
+            ds = request.get_dataset()
+            if type(ds) == list or type(ds) == tuple:
+                ds = next(iter(ds))
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be a string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(
+                reason="Cannot compute Latitude/Longitude Time Average plot on 
a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), 
request.get_min_lat(), \
+                                           request.get_max_lon(), 
request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, 
north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited 
float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. 
Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
+               spark_master, spark_nexecs, spark_nparts
 
-        # print 'cnt_tile = ', cnt_tile
-        # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
-        # sum_tile.mask = cnt_tile.mask
-        # avg_tile = sum_tile / cnt_tile
-        # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': 
cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in 
range(tile_inbounds_shape[0])]
-        # print 'Finished tile', tile_bounds
-        # print 'Tile avg = ', avg_tile
-        # sys.stdout.flush()
-        return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
-
-    def calc(self, computeOptions, **args):
+    def calc(self, compute_options, **args):
         """
 
-        :param computeOptions: StatsComputeOptions
+        :param compute_options: StatsComputeOptions
         :param args: dict
         :return:
         """
 
-        spark_master, spark_nexecs, spark_nparts = 
computeOptions.get_spark_cfg()
-        self._setQueryParams(computeOptions.get_dataset()[0],
-                             (float(computeOptions.get_min_lat()),
-                              float(computeOptions.get_max_lat()),
-                              float(computeOptions.get_min_lon()),
-                              float(computeOptions.get_max_lon())),
-                             computeOptions.get_start_time(),
-                             computeOptions.get_end_time(),
+        ds, bbox, start_time, end_time, spark_master, spark_nexecs, 
spark_nparts = self.parse_arguments(compute_options)
+
+        compute_options.get_spark_cfg()
+
+        self._setQueryParams(ds,
+                             (float(bbox.bounds[1]),
+                              float(bbox.bounds[3]),
+                              float(bbox.bounds[0]),
+                              float(bbox.bounds[2])),
+                             start_time,
+                             end_time,
                              spark_master=spark_master,
                              spark_nexecs=spark_nexecs,
                              spark_nparts=spark_nparts)
 
-        if 'CLIM' in self._ds:
-            raise NexusProcessingException(
-                reason="Cannot compute Latitude/Longitude Time Average plot on 
a climatology", code=400)
-
         nexus_tiles = self._find_global_tile_set()
-        # print 'tiles:'
-        # for tile in nexus_tiles:
-        #     print tile.granule
-        #     print tile.section_spec
-        #     print 'lat:', tile.latitudes
-        #     print 'lon:', tile.longitudes
-
-        #                                                          nexus_tiles)
+
         if len(nexus_tiles) == 0:
             raise NoDataException(reason="No data found for selected 
timeframe")
 
@@ -152,14 +173,11 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
                                                               
self._maxLonCent))
 
-        # for tile in nexus_tiles:
-        #    print 'lats: ', tile.latitudes.compressed()
-        #    print 'lons: ', tile.longitudes.compressed()
         # Create array of tuples to pass to Spark map function
         nexus_tiles_spark = [[self._find_tile_bounds(t),
                               self._startTime, self._endTime,
                               self._ds] for t in nexus_tiles]
-        # print 'nexus_tiles_spark = ', nexus_tiles_spark
+
         # Remove empty tiles (should have bounds set to None)
         bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
         for i in np.flipud(bad_tile_inds):
@@ -168,7 +186,7 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         # Expand Spark map tuple array by duplicating each entry N times,
         # where N is the number of ways we want the time dimension carved up.
         num_time_parts = 72
-        # num_time_parts = 1
+
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, 
axis=0)
         self.log.debug('repeated len(nexus_tiles_spark) = 
{0}'.format(len(nexus_tiles_spark)))
 
@@ -183,9 +201,6 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
                       len(nexus_tiles_spark) / num_time_parts, 
axis=0).reshape((len(nexus_tiles_spark), 2))
         
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
-        # print 'nexus_tiles_spark final = '
-        # for i in range(len(nexus_tiles_spark)):
-        #    print nexus_tiles_spark[i]
 
         # Launch Spark computations
         rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
@@ -246,13 +261,48 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
 
         # Create dict for JSON response
-        results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+        results = [[{'mean': a[y, x], 'cnt': int(n[y, x]),
                      'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
                     for x in range(a.shape[1])] for y in range(a.shape[0])]
 
-        return TimeAvgMapSparkResults(results=results, meta={}, 
computeOptions=computeOptions)
+        return NexusResults(results=results, meta={}, stats=None,
+                            computeOptions=None, minLat=bbox.bounds[1],
+                            maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
+                            maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
+                            endTime=end_time)
+
+    @staticmethod
+    def _map(tile_in_spark):
+        tile_bounds = tile_in_spark[0]
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+        startTime = tile_in_spark[1]
+        endTime = tile_in_spark[2]
+        ds = tile_in_spark[3]
+        tile_service = NexusTileService()
+
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+
+        days_at_a_time = 30
+
+        t_incr = 86400 * days_at_a_time
+        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+        t_start = startTime
+        while t_start <= endTime:
+            t_end = min(t_start + t_incr, endTime)
+
+            nexus_tiles = \
+                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+                                                      min_lon, max_lon,
+                                                      ds=ds,
+                                                      start_time=t_start,
+                                                      end_time=t_end)
 
+            for tile in nexus_tiles:
+                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+                cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 
1]).astype(np.uint8)
+            t_start = t_end + 1
 
-class TimeAvgMapSparkResults(NexusResults):
-    def __init__(self, results=None, meta=None, computeOptions=None):
-        NexusResults.__init__(self, results=results, meta=meta, stats=None, 
computeOptions=computeOptions)
+        return (min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)

Reply via email to