fgreg closed pull request #35: SDAP-93 Clean up and document HofMoellerSpark
URL: https://github.com/apache/incubator-sdap-nexus/pull/35
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/analysis/.idea/vcs.xml b/analysis/.idea/vcs.xml
new file mode 100644
index 0000000..6c0b863
--- /dev/null
+++ b/analysis/.idea/vcs.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$/.." vcs="Git" />
+  </component>
+</project>
\ No newline at end of file
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py 
b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 92d7a70..ea4a37d 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -13,22 +13,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import sys
 import itertools
 import logging
-import traceback
 from cStringIO import StringIO
 from datetime import datetime
 
 import matplotlib.pyplot as plt
 import mpld3
 import numpy as np
+import shapely.geometry
 from matplotlib import cm
 from matplotlib.ticker import FuncFormatter
 from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
 
-from webservice.NexusHandler import SparkHandler, nexus_handler, 
DEFAULT_PARAMETERS_SPEC
-from webservice.webmodel import NexusProcessingException, NexusResults
+from webservice.NexusHandler import SparkHandler, nexus_handler
+from webservice.webmodel import NexusResults, NoDataException, 
NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%SZ'
 
 SENTINEL = 'STOP'
 LATITUDE = 0
@@ -50,9 +53,8 @@ def hofmoeller_stats(tile_in_spark):
             tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
                                                    min_lon, max_lon, [tile])[0]
         except IndexError:
-            #return None
+            # return None
             return []
-
         t = np.ma.min(tile.times)
         stats = []
 
@@ -65,14 +67,13 @@ def hofmoeller_stats(tile_in_spark):
             # Longitude-Time Map (Average over latitudes)
             data = sorted(points, key=lambda p: p.longitude)
             points_by_coord = itertools.groupby(data, key=lambda p: 
p.longitude)
-            
- 
+
         for coord, points_at_coord in points_by_coord:
             values_at_coord = np.array([[p.data_val,
-                                        np.cos(np.radians(p.latitude))]
+                                         np.cos(np.radians(p.latitude))]
                                         for p in points_at_coord])
-            vals = np.nan_to_num(values_at_coord[:,0])
-            weights = values_at_coord[:,1]
+            vals = np.nan_to_num(values_at_coord[:, 0])
+            weights = values_at_coord[:, 1]
             coord_cnt = len(values_at_coord)
             if latlon == 0:
                 # Latitude-Time Map (Average over longitudes)
@@ -96,10 +97,107 @@ def hofmoeller_stats(tile_in_spark):
 
 
 class BaseHoffMoellerHandlerImpl(SparkHandler):
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "comma-delimited string",
+            "description": "The dataset(s) Used to generate the plot. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude. Required"
+        },
+        "spark": {
+            "name": "Spark Configuration",
+            "type": "comma-delimited value",
+            "description": "Configuration used to launch in the Spark cluster. 
Value should be 3 elements separated by "
+                           "commas. 1) Spark Master 2) Number of Spark 
Executors 3) Number of Spark Partitions. Only "
+                           "Number of Spark Partitions is used by this 
function. Optional (Default: local,1,1)"
+        }
+    }
+
     def __init__(self):
         SparkHandler.__init__(self)
         self.log = logging.getLogger(__name__)
 
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+
+        try:
+            ds = request.get_dataset()
+            if type(ds) == list or type(ds) == tuple:
+                ds = next(iter(ds))
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be a string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(
+                reason="Cannot compute Latitude/Longitude Time Average plot on 
a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), 
request.get_min_lat(), \
+                                           request.get_max_lon(), 
request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, 
north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited 
float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. 
Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
+               spark_master, spark_nexecs, spark_nparts
+
     def applyDeseasonToHofMoellerByField(self, results, pivot="lats", 
field="avg", append=True):
         shape = (len(results), len(results[0][pivot]))
         if shape[0] <= 12:
@@ -146,21 +244,22 @@ def hof_tuple_time(t):
 
 
 def hof_tuple_combine(t1, t2):
-    return (t1[0], # Time
-            t1[1], # Sequence (index)
-            t1[2], # Coordinate on axis (latitude or longitude)
-            t1[3] + t2[3], # Number of values
-            t1[4] + t2[4], # Sum of values (weighted for lon-time maps)
-            t1[5] + t2[5], # Sum of weights (= # of values for lat-time maps)
-            max(t1[6], t2[6]), # Maximum value
-            min(t1[7], t2[7]), # Minimum value
-            parallel_variance(t1[4]/t1[5], t1[3], t1[8], 
-                              t2[4]/t2[5], t2[3], t2[8])) # Variance
+    return (t1[0],  # Time
+            t1[1],  # Sequence (index)
+            t1[2],  # Coordinate on axis (latitude or longitude)
+            t1[3] + t2[3],  # Number of values
+            t1[4] + t2[4],  # Sum of values (weighted for lon-time maps)
+            t1[5] + t2[5],  # Sum of weights (= # of values for lat-time maps)
+            max(t1[6], t2[6]),  # Maximum value
+            min(t1[7], t2[7]),  # Minimum value
+            parallel_variance(t1[4] / t1[5], t1[3], t1[8],
+                              t2[4] / t2[5], t2[3], t2[8]))  # Variance
+
 
 def hof_tuple_to_dict(t, avg_var_name):
     return {avg_var_name: t[2],
             'cnt': t[3],
-            'avg': t[4] / t[5],
+            'mean': t[4] / t[5],
             'std': np.sqrt(t[8]),
             'max': t[6],
             'min': t[7]}
@@ -176,7 +275,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
         # Longitude-Time Map (Average over latitudes)
         avg_var_name = 'longitude'
         avg_var_name_collection = 'lons'
-        
+
     # Create a set of key-value pairs where the key is (time, lat|lon) and
     # the value is a tuple of intermediate statistics for the specified
     # coordinate within a single NEXUS tile.
@@ -184,7 +283,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
 
     # Combine tuples across tiles with input key = (time, lat|lon)
     # Output a key value pair with key = (time)
-    results = results.combineByKey(lambda val: (hof_tuple_time(val),val),
+    results = results.combineByKey(lambda val: (hof_tuple_time(val), val),
                                    lambda x, val: (hof_tuple_time(x),
                                                    hof_tuple_combine(x[1],
                                                                      val)),
@@ -194,29 +293,32 @@ def spark_driver(sc, latlon, nexus_tiles_spark):
 
     # Convert the tuples to dictionary entries and combine coordinates
     # with the same time stamp.  Here we have input key = (time)
-    results = results.values().\
-              combineByKey(lambda val, avg_var_name=avg_var_name,
-                           avg_var_name_collection=avg_var_name_collection: {
-                               'sequence': val[1],
-                               'time': val[0],
-                               avg_var_name_collection: [
-                                   hof_tuple_to_dict(val, avg_var_name)]},
-                           lambda x, val, avg_var_name=avg_var_name,
-                           avg_var_name_collection=avg_var_name_collection: {
-                               'sequence': x['sequence'],
-                               'time': x['time'],
-                               avg_var_name_collection: (
-                                   x[avg_var_name_collection] +
-                                   [hof_tuple_to_dict(val, avg_var_name)])},
-                           lambda x, y,
-                             avg_var_name_collection=avg_var_name_collection:
-                             {'sequence': x['sequence'],
-                              'time': x['time'],
-                              avg_var_name_collection: (
-                                  x[avg_var_name_collection] +
-                                  y[avg_var_name_collection])}).\
-              values().\
-              collect()
+    results = results.values(). \
+        combineByKey(lambda val, avg_var_name=avg_var_name,
+                            avg_var_name_collection=avg_var_name_collection: {
+        'sequence': val[1],
+        'time': val[0],
+        'iso_time': datetime.utcfromtimestamp(val[0]).strftime(ISO_8601),
+        avg_var_name_collection: [
+            hof_tuple_to_dict(val, avg_var_name)]},
+                     lambda x, val, avg_var_name=avg_var_name,
+                            avg_var_name_collection=avg_var_name_collection: {
+                         'sequence': x['sequence'],
+                         'time': x['time'],
+                         'iso_time': x['iso_time'],
+                         avg_var_name_collection: (
+                                 x[avg_var_name_collection] +
+                                 [hof_tuple_to_dict(val, avg_var_name)])},
+                     lambda x, y,
+                            avg_var_name_collection=avg_var_name_collection:
+                     {'sequence': x['sequence'],
+                      'time': x['time'],
+                      'iso_time': x['iso_time'],
+                      avg_var_name_collection: (
+                              x[avg_var_name_collection] +
+                              y[avg_var_name_collection])}). \
+        values(). \
+        collect()
 
     return results
 
@@ -226,26 +328,28 @@ class 
LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     name = "Latitude/Time HofMoeller Spark"
     path = "/latitudeTimeHofMoellerSpark"
     description = "Computes a latitude/time HofMoeller plot given an arbitrary 
geographical area and time range"
-    params = DEFAULT_PARAMETERS_SPEC
+    params = BaseHoffMoellerHandlerImpl.params
     singleton = True
 
     def __init__(self):
-        self._latlon = 0 # 0 for latitude-time map, 1 for longitude-time map
+        self._latlon = 0  # 0 for latitude-time map, 1 for longitude-time map
         BaseHoffMoellerHandlerImpl.__init__(self)
 
-    def calc(self, computeOptions, **args):
-        nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
-                              computeOptions.get_min_lat(),
-                              computeOptions.get_max_lat(),
-                              computeOptions.get_min_lon(),
-                              computeOptions.get_max_lon())
-                             for x, tile in 
enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), 
computeOptions.get_max_lat(), computeOptions.get_min_lon(), 
computeOptions.get_max_lon(), computeOptions.get_dataset()[0], 
computeOptions.get_start_time(), computeOptions.get_end_time(), 
fetch_data=False))]
+    def calc(self, compute_options, **args):
+        ds, bbox, start_time, end_time, spark_master, spark_nexecs, 
spark_nparts = self.parse_arguments(compute_options)
+
+        min_lon, min_lat, max_lon, max_lat = bbox.bounds
+
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, 
min_lon, max_lon) for x, tile in
+                             
enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, 
max_lon,
+                                                                            
ds, start_time, end_time,
+                                                                            
fetch_data=False))]
+
         print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
-            raise NexusProcessingException.NoDataException(reason="No data 
found for selected timeframe")
+            raise NoDataException(reason="No data found for selected 
timeframe")
 
-        results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
-                                                        
+        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry['time'])
         for i in range(len(results)):
@@ -253,7 +357,9 @@ def calc(self, computeOptions, **args):
                                         key=lambda entry: entry['latitude'])
         results = self.applyDeseasonToHofMoeller(results)
 
-        result = HoffMoellerResults(results=results, 
computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
+        result = HoffMoellerResults(results=results, compute_options=None, 
type=HoffMoellerResults.LATITUDE,
+                                    minLat=min_lat, maxLat=max_lat, 
minLon=min_lon,
+                                    maxLon=max_lon, ds=ds, 
startTime=start_time, endTime=end_time)
         return result
 
 
@@ -262,25 +368,28 @@ class 
LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     name = "Longitude/Time HofMoeller Spark"
     path = "/longitudeTimeHofMoellerSpark"
     description = "Computes a longitude/time HofMoeller plot given an 
arbitrary geographical area and time range"
-    params = DEFAULT_PARAMETERS_SPEC
+    params = BaseHoffMoellerHandlerImpl.params
     singleton = True
 
     def __init__(self):
-        self._latlon = 1 # 0 for latitude-time map; 1 for longitude-time map
+        self._latlon = 1  # 0 for latitude-time map; 1 for longitude-time map
         BaseHoffMoellerHandlerImpl.__init__(self)
 
-    def calc(self, computeOptions, **args):
-        nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
-                              computeOptions.get_min_lat(),
-                              computeOptions.get_max_lat(),
-                              computeOptions.get_min_lon(),
-                              computeOptions.get_max_lon())
-                             for x, tile in 
enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), 
computeOptions.get_max_lat(), computeOptions.get_min_lon(), 
computeOptions.get_max_lon(), computeOptions.get_dataset()[0], 
computeOptions.get_start_time(), computeOptions.get_end_time(), 
fetch_data=False))]
+    def calc(self, compute_options, **args):
+        ds, bbox, start_time, end_time, spark_master, spark_nexecs, 
spark_nparts = self.parse_arguments(compute_options)
+
+        min_lon, min_lat, max_lon, max_lat = bbox.bounds
+
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, 
min_lon, max_lon) for x, tile in
+                             
enumerate(self._tile_service.find_tiles_in_box(min_lat, max_lat, min_lon, 
max_lon,
+                                                                            
ds, start_time, end_time,
+                                                                            
fetch_data=False))]
 
+        print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
-            raise NexusProcessingException.NoDataException(reason="No data 
found for selected timeframe")
+            raise NoDataException(reason="No data found for selected 
timeframe")
 
-        results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
+        results = spark_driver(self._sc, self._latlon, nexus_tiles_spark)
 
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry["time"])
@@ -290,7 +399,9 @@ def calc(self, computeOptions, **args):
 
         results = self.applyDeseasonToHofMoeller(results, pivot="lons")
 
-        result = HoffMoellerResults(results=results, 
computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE)
+        result = HoffMoellerResults(results=results, compute_options=None, 
type=HoffMoellerResults.LONGITUDE,
+                                    minLat=min_lat, maxLat=max_lat, 
minLon=min_lon,
+                                    maxLon=max_lon, ds=ds, 
startTime=start_time, endTime=end_time)
         return result
 
 
@@ -298,9 +409,10 @@ class HoffMoellerResults(NexusResults):
     LATITUDE = 0
     LONGITUDE = 1
 
-    def __init__(self, results=None, meta=None, stats=None, 
computeOptions=None, **args):
-        NexusResults.__init__(self, results=results, meta=meta, stats=stats, 
computeOptions=computeOptions)
-        self.__type = args['type']
+    def __init__(self, results=None, meta=None, stats=None, **kwargs):
+
+        NexusResults.__init__(self, results=results, meta=meta, stats=stats, 
computeOptions=None, **kwargs)
+        self.__type = kwargs['type']
 
     def createHoffmueller(self, data, coordSeries, timeSeries, coordName, 
title, interpolate='nearest'):
         cmap = cm.coolwarm
@@ -399,22 +511,3 @@ def toImage(self):
             return self.createLongitudeHoffmueller(res, meta)
         else:
             raise Exception("Unsupported HoffMoeller Plot Type")
-
-
-def pool_worker(type, work_queue, done_queue):
-    try:
-
-        if type == LATITUDE:
-            calculator = LatitudeHofMoellerCalculator()
-        elif type == LONGITUDE:
-            calculator = LongitudeHofMoellerCalculator()
-
-        for work in iter(work_queue.get, SENTINEL):
-            scifunction = work[0]
-            args = work[1:]
-            result = calculator.__getattribute__(scifunction)(*args)
-            done_queue.put(result)
-
-    except Exception as e:
-        e_str = traceback.format_exc(e)
-        done_queue.put({'error': e_str})
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py 
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 19de786..473f4ce 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -15,131 +15,152 @@
 
 
 import logging
+from datetime import datetime
 
 import numpy as np
+import shapely.geometry
 from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
 
-# from time import time
-from webservice.NexusHandler import nexus_handler, SparkHandler, 
DEFAULT_PARAMETERS_SPEC
+from webservice.NexusHandler import nexus_handler, SparkHandler
 from webservice.webmodel import NexusResults, NexusProcessingException, 
NoDataException
 
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
 
 @nexus_handler
 class TimeAvgMapSparkHandlerImpl(SparkHandler):
     name = "Time Average Map Spark"
     path = "/timeAvgMapSpark"
     description = "Computes a Latitude/Longitude Time Average plot given an 
arbitrary geographical area and time range"
-    params = DEFAULT_PARAMETERS_SPEC
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "String",
+            "description": "The dataset used to generate the map. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude. Required"
+        },
+        "spark": {
+            "name": "Spark Configuration",
+            "type": "comma-delimited value",
+            "description": "Configuration used to launch in the Spark cluster. 
Value should be 3 elements separated by "
+                           "commas. 1) Spark Master 2) Number of Spark 
Executors 3) Number of Spark Partitions. Only "
+                           "Number of Spark Partitions is used by this 
function. Optional (Default: local,1,1)"
+        }
+    }
     singleton = True
 
     def __init__(self):
         SparkHandler.__init__(self)
         self.log = logging.getLogger(__name__)
-        # self.log.setLevel(logging.DEBUG)
 
-    @staticmethod
-    def _map(tile_in_spark):
-        tile_bounds = tile_in_spark[0]
-        (min_lat, max_lat, min_lon, max_lon,
-         min_y, max_y, min_x, max_x) = tile_bounds
-        startTime = tile_in_spark[1]
-        endTime = tile_in_spark[2]
-        ds = tile_in_spark[3]
-        tile_service = NexusTileService()
-        # print 'Started tile {0}'.format(tile_bounds)
-        # sys.stdout.flush()
-        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
-        # days_at_a_time = 90
-        days_at_a_time = 30
-        # days_at_a_time = 7
-        # days_at_a_time = 1
-        # print 'days_at_a_time = {0}'.format(days_at_a_time)
-        t_incr = 86400 * days_at_a_time
-        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
-        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
-        t_start = startTime
-        while t_start <= endTime:
-            t_end = min(t_start + t_incr, endTime)
-            # t1 = time()
-            # print 'nexus call start at time {0}'.format(t1)
-            # sys.stdout.flush()
-            # nexus_tiles = \
-            #    TimeAvgMapSparkHandlerImpl.query_by_parts(tile_service,
-            #                                              min_lat, max_lat, 
-            #                                              min_lon, max_lon, 
-            #                                              ds, 
-            #                                              t_start, 
-            #                                              t_end,
-            #                                              part_dim=2)
-            nexus_tiles = \
-                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
-                                                      min_lon, max_lon,
-                                                      ds=ds,
-                                                      start_time=t_start,
-                                                      end_time=t_end)
-            # t2 = time()
-            # print 'nexus call end at time %f' % t2
-            # print 'secs in nexus call: ', t2 - t1
-            # print 't %d to %d - Got %d tiles' % (t_start, t_end,
-            #                                     len(nexus_tiles))
-            # for nt in nexus_tiles:
-            #    print nt.granule
-            #    print nt.section_spec
-            #    print 'lat min/max:', np.ma.min(nt.latitudes), 
np.ma.max(nt.latitudes)
-            #    print 'lon min/max:', np.ma.min(nt.longitudes), 
np.ma.max(nt.longitudes)
-            # sys.stdout.flush()
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
 
-            for tile in nexus_tiles:
-                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
-                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
-                cnt_tile += (~tile.data.mask[0,
-                              min_y:max_y + 1,
-                              min_x:max_x + 1]).astype(np.uint8)
-            t_start = t_end + 1
+        try:
+            ds = request.get_dataset()
+            if type(ds) == list or type(ds) == tuple:
+                ds = next(iter(ds))
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be a string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(
+                reason="Cannot compute Latitude/Longitude Time Average plot on 
a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), 
request.get_min_lat(), \
+                                           request.get_max_lon(), 
request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, 
north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited 
float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. 
Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
+               spark_master, spark_nexecs, spark_nparts
 
-        # print 'cnt_tile = ', cnt_tile
-        # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
-        # sum_tile.mask = cnt_tile.mask
-        # avg_tile = sum_tile / cnt_tile
-        # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': 
cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in 
range(tile_inbounds_shape[0])]
-        # print 'Finished tile', tile_bounds
-        # print 'Tile avg = ', avg_tile
-        # sys.stdout.flush()
-        return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
-
-    def calc(self, computeOptions, **args):
+    def calc(self, compute_options, **args):
         """
 
-        :param computeOptions: StatsComputeOptions
+        :param compute_options: StatsComputeOptions
         :param args: dict
         :return:
         """
 
-        spark_master, spark_nexecs, spark_nparts = 
computeOptions.get_spark_cfg()
-        self._setQueryParams(computeOptions.get_dataset()[0],
-                             (float(computeOptions.get_min_lat()),
-                              float(computeOptions.get_max_lat()),
-                              float(computeOptions.get_min_lon()),
-                              float(computeOptions.get_max_lon())),
-                             computeOptions.get_start_time(),
-                             computeOptions.get_end_time(),
+        ds, bbox, start_time, end_time, spark_master, spark_nexecs, 
spark_nparts = self.parse_arguments(compute_options)
+
+        compute_options.get_spark_cfg()
+
+        self._setQueryParams(ds,
+                             (float(bbox.bounds[1]),
+                              float(bbox.bounds[3]),
+                              float(bbox.bounds[0]),
+                              float(bbox.bounds[2])),
+                             start_time,
+                             end_time,
                              spark_master=spark_master,
                              spark_nexecs=spark_nexecs,
                              spark_nparts=spark_nparts)
 
-        if 'CLIM' in self._ds:
-            raise NexusProcessingException(
-                reason="Cannot compute Latitude/Longitude Time Average plot on 
a climatology", code=400)
-
         nexus_tiles = self._find_global_tile_set()
-        # print 'tiles:'
-        # for tile in nexus_tiles:
-        #     print tile.granule
-        #     print tile.section_spec
-        #     print 'lat:', tile.latitudes
-        #     print 'lon:', tile.longitudes
-
-        #                                                          nexus_tiles)
+
         if len(nexus_tiles) == 0:
             raise NoDataException(reason="No data found for selected 
timeframe")
 
@@ -152,14 +173,11 @@ def calc(self, computeOptions, **args):
         self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
                                                               
self._maxLonCent))
 
-        # for tile in nexus_tiles:
-        #    print 'lats: ', tile.latitudes.compressed()
-        #    print 'lons: ', tile.longitudes.compressed()
         # Create array of tuples to pass to Spark map function
         nexus_tiles_spark = [[self._find_tile_bounds(t),
                               self._startTime, self._endTime,
                               self._ds] for t in nexus_tiles]
-        # print 'nexus_tiles_spark = ', nexus_tiles_spark
+
         # Remove empty tiles (should have bounds set to None)
         bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
         for i in np.flipud(bad_tile_inds):
@@ -168,7 +186,7 @@ def calc(self, computeOptions, **args):
         # Expand Spark map tuple array by duplicating each entry N times,
         # where N is the number of ways we want the time dimension carved up.
         num_time_parts = 72
-        # num_time_parts = 1
+
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, 
axis=0)
         self.log.debug('repeated len(nexus_tiles_spark) = 
{0}'.format(len(nexus_tiles_spark)))
 
@@ -183,9 +201,6 @@ def calc(self, computeOptions, **args):
                       len(nexus_tiles_spark) / num_time_parts, 
axis=0).reshape((len(nexus_tiles_spark), 2))
         
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
-        # print 'nexus_tiles_spark final = '
-        # for i in range(len(nexus_tiles_spark)):
-        #    print nexus_tiles_spark[i]
 
         # Launch Spark computations
         rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
@@ -246,13 +261,48 @@ def calc(self, computeOptions, **args):
         self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
 
         # Create dict for JSON response
-        results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+        results = [[{'mean': a[y, x], 'cnt': int(n[y, x]),
                      'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
                     for x in range(a.shape[1])] for y in range(a.shape[0])]
 
-        return TimeAvgMapSparkResults(results=results, meta={}, 
computeOptions=computeOptions)
+        return NexusResults(results=results, meta={}, stats=None,
+                            computeOptions=None, minLat=bbox.bounds[1],
+                            maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
+                            maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
+                            endTime=end_time)
+
+    @staticmethod
+    def _map(tile_in_spark):
+        tile_bounds = tile_in_spark[0]
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+        startTime = tile_in_spark[1]
+        endTime = tile_in_spark[2]
+        ds = tile_in_spark[3]
+        tile_service = NexusTileService()
+
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+
+        days_at_a_time = 30
+
+        t_incr = 86400 * days_at_a_time
+        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+        t_start = startTime
+        while t_start <= endTime:
+            t_end = min(t_start + t_incr, endTime)
+
+            nexus_tiles = \
+                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+                                                      min_lon, max_lon,
+                                                      ds=ds,
+                                                      start_time=t_start,
+                                                      end_time=t_end)
 
+            for tile in nexus_tiles:
+                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+                cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 
1]).astype(np.uint8)
+            t_start = t_end + 1
 
-class TimeAvgMapSparkResults(NexusResults):
-    def __init__(self, results=None, meta=None, computeOptions=None):
-        NexusResults.__init__(self, results=results, meta=meta, stats=None, 
computeOptions=computeOptions)
+        return (min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py 
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index e89a00e..a24c2d5 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -505,6 +505,10 @@ def spark_driver(daysinrange, bounding_polygon, ds, 
fill=-9999., spark_nparts_ne
 
 def calc_average_on_day(tile_in_spark):
     import shapely.wkt
+    from datetime import datetime
+    from pytz import timezone
+    ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
     (bounding_wkt, dataset, timestamps, fill) = tile_in_spark
     if len(timestamps) == 0:
         return []
@@ -561,7 +565,8 @@ def calc_average_on_day(tile_in_spark):
             'mean': daily_mean,
             'cnt': data_count,
             'std': data_std,
-            'time': int(timeinseconds)
+            'time': int(timeinseconds),
+            'iso_time': 
datetime.utcfromtimestamp(int(timeinseconds)).replace(tzinfo=timezone('UTC')).strftime(ISO_8601)
         }
         stats_arr.append(stat)
     return stats_arr
diff --git a/analysis/webservice/webmodel.py b/analysis/webservice/webmodel.py
index e75ac01..2b61b5f 100644
--- a/analysis/webservice/webmodel.py
+++ b/analysis/webservice/webmodel.py
@@ -27,6 +27,7 @@
 from shapely.geometry import Polygon
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
 
 
 class RequestParameters(object):
@@ -422,7 +423,9 @@ def _extendMeta(self, meta, minLat, maxLat, minLon, maxLon, 
ds, startTime, endTi
         }
         meta["time"] = {
             "start": startTime,
-            "stop": endTime
+            "stop": endTime,
+            "iso_start": 
datetime.utcfromtimestamp(int(startTime)).replace(tzinfo=timezone('UTC')).strftime(ISO_8601),
+            "iso_stop": 
datetime.utcfromtimestamp(int(endTime)).replace(tzinfo=timezone('UTC')).strftime(ISO_8601)
         }
         return meta
 
diff --git a/data-access/nexustiles/dao/SolrProxy.pyx 
b/data-access/nexustiles/dao/SolrProxy.pyx
index 20dfeeb..0a36707 100644
--- a/data-access/nexustiles/dao/SolrProxy.pyx
+++ b/data-access/nexustiles/dao/SolrProxy.pyx
@@ -28,6 +28,8 @@ SOLR_CON_LOCK = threading.Lock()
 thread_local = threading.local()
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+SOLR_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
 
 
 class SolrProxy(object):
@@ -156,8 +158,12 @@ class SolrProxy(object):
         datasets = self.get_data_series_list_simple()
 
         for dataset in datasets:
-            dataset['start'] = (self.find_min_date_from_tiles([], 
ds=dataset['title']) - EPOCH).total_seconds() * 1000
-            dataset['end'] = (self.find_max_date_from_tiles([], 
ds=dataset['title']) - EPOCH).total_seconds() * 1000
+            min_date = self.find_min_date_from_tiles([], ds=dataset['title'])
+            max_date = self.find_max_date_from_tiles([], ds=dataset['title'])
+            dataset['start'] = (min_date - EPOCH).total_seconds()
+            dataset['end'] = (max_date - EPOCH).total_seconds()
+            dataset['iso_start'] = min_date.strftime(ISO_8601)
+            dataset['iso_end'] = max_date.strftime(ISO_8601)
 
         return datasets
 
@@ -236,8 +242,8 @@ class SolrProxy(object):
 
         search = 'dataset_s:%s' % ds
 
-        search_start_s = 
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
-        search_end_s = 
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+        search_start_s = 
datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT)
+        search_end_s = 
datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT)
 
         additionalparams = {
             'fq': [
@@ -258,7 +264,7 @@ class SolrProxy(object):
         response = self.do_query_raw(*(search, None, None, False, None), 
**additionalparams)
 
         daysinrangeasc = sorted(
-            [(datetime.strptime(a_date, '%Y-%m-%dT%H:%M:%SZ') - 
datetime.utcfromtimestamp(0)).total_seconds() for a_date
+            [(datetime.strptime(a_date, SOLR_FORMAT) - 
datetime.utcfromtimestamp(0)).total_seconds() for a_date
              in response.facets['facet_fields']['tile_min_time_dt'][::2]])
 
         return daysinrangeasc
@@ -276,8 +282,8 @@ class SolrProxy(object):
         }
 
         if 0 < start_time <= end_time:
-            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
-            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT)
+            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT)
 
             time_clause = "(" \
                           "tile_min_time_dt:[%s TO %s] " \
@@ -308,8 +314,8 @@ class SolrProxy(object):
         }
 
         if 0 < start_time <= end_time:
-            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
-            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT)
+            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT)
 
             time_clause = "(" \
                           "tile_min_time_dt:[%s TO %s] " \
@@ -340,8 +346,8 @@ class SolrProxy(object):
         }
 
         if 0 < start_time <= end_time:
-            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
-            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT)
+            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT)
 
             time_clause = "(" \
                           "tile_min_time_dt:[%s TO %s] " \
@@ -377,8 +383,8 @@ class SolrProxy(object):
         }
 
         if 0 < start_time <= end_time:
-            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
-            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT)
+            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT)
 
             time_clause = "(" \
                           "tile_min_time_dt:[%s TO %s] " \
@@ -414,8 +420,8 @@ class SolrProxy(object):
         }
 
         if 0 < start_time <= end_time:
-            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
-            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+            search_start_s = 
datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT)
+            search_end_s = 
datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT)
 
             time_clause = "(" \
                           "tile_min_time_dt:[%s TO %s] " \
@@ -437,7 +443,7 @@ class SolrProxy(object):
     def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, 
max_lon, ds, search_time, **kwargs):
         search = 'dataset_s:%s' % ds
 
-        the_time = 
datetime.utcfromtimestamp(search_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+        the_time = datetime.utcfromtimestamp(search_time).strftime(SOLR_FORMAT)
         time_clause = "(" \
                       "tile_min_time_dt:[* TO %s] " \
                       "AND tile_max_time_dt:[%s TO *] " \
@@ -460,7 +466,7 @@ class SolrProxy(object):
     def find_all_tiles_in_polygon_at_time(self, bounding_polygon, ds, 
search_time, **kwargs):
         search = 'dataset_s:%s' % ds
 
-        the_time = 
datetime.utcfromtimestamp(search_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+        the_time = datetime.utcfromtimestamp(search_time).strftime(SOLR_FORMAT)
         time_clause = "(" \
                       "tile_min_time_dt:[* TO %s] " \
                       "AND tile_max_time_dt:[%s TO *] " \
@@ -483,7 +489,7 @@ class SolrProxy(object):
     def find_all_tiles_within_box_at_time(self, min_lat, max_lat, min_lon, 
max_lon, ds, time, **kwargs):
         search = 'dataset_s:%s' % ds
 
-        the_time = 
datetime.utcfromtimestamp(time).strftime('%Y-%m-%dT%H:%M:%SZ')
+        the_time = datetime.utcfromtimestamp(time).strftime(SOLR_FORMAT)
         time_clause = "(" \
                       "tile_min_time_dt:[* TO %s] " \
                       "AND tile_max_time_dt:[%s TO *] " \
@@ -507,7 +513,7 @@ class SolrProxy(object):
     def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, 
max_lon, ds, time, **kwargs):
         search = 'dataset_s:%s' % ds
 
-        the_time = 
datetime.utcfromtimestamp(time).strftime('%Y-%m-%dT%H:%M:%SZ')
+        the_time = datetime.utcfromtimestamp(time).strftime(SOLR_FORMAT)
         time_clause = "(" \
                       "tile_min_time_dt:[* TO %s] " \
                       "AND tile_max_time_dt:[%s TO *] " \
@@ -555,8 +561,8 @@ class SolrProxy(object):
             **additionalparams)
 
     def get_formatted_time_clause(self, start_time, end_time):
-        search_start_s = 
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
-        search_end_s = 
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+        search_start_s = 
datetime.utcfromtimestamp(start_time).strftime(SOLR_FORMAT)
+        search_end_s = 
datetime.utcfromtimestamp(end_time).strftime(SOLR_FORMAT)
 
         time_clause = "(" \
                       "tile_min_time_dt:[%s TO %s] " \
@@ -651,7 +657,7 @@ class SolrProxy(object):
         return datetime.strptime(date, 
"%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC)
 
     def convert_iso_to_timestamp(self, date):
-        return (self.convert_iso_to_datetime(date) - EPOCH).total_seconds() * 
1000
+        return (self.convert_iso_to_datetime(date) - EPOCH).total_seconds()
 
     def ping(self):
         solrAdminPing = 'http://%s/solr/%s/admin/ping' % (self.solrUrl, 
self.solrCore)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to