This is an automated email from the ASF dual-hosted git repository.

jjacob pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f05da6  SDAP-220: temporal variance algorithm (#93)
8f05da6 is described below

commit 8f05da6372f7498c5e170a29c816d654ae74adfa
Author: Maya DeBellis <[email protected]>
AuthorDate: Mon Apr 6 14:54:32 2020 -0700

    SDAP-220: temporal variance algorithm (#93)
    
    * add data anomaly algorithm
    
    * add temporal variance algorithm
    
    * remove incorrect data_anomaly
    
    Co-authored-by: Maya Debellis <[email protected]>
---
 .../webservice/algorithms_spark/VarianceSpark.py   | 397 +++++++++++++++++++++
 analysis/webservice/algorithms_spark/__init__.py   |   5 +
 2 files changed, 402 insertions(+)

diff --git a/analysis/webservice/algorithms_spark/VarianceSpark.py 
b/analysis/webservice/algorithms_spark/VarianceSpark.py
new file mode 100644
index 0000000..07b055e
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/VarianceSpark.py
@@ -0,0 +1,397 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import math
+import logging
+from datetime import datetime
+
+import numpy as np
+import shapely.geometry
+from nexustiles.nexustiles import NexusTileService
+from pytz import timezone
+
+from webservice.NexusHandler import nexus_handler, SparkHandler
+from webservice.webmodel import NexusResults, NexusProcessingException, 
NoDataException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+@nexus_handler
+class VarianceSparkHandlerImpl(SparkHandler):
+    name = "Temporal Variance Spark"
+    path = "/varianceSpark"
+    description = "Computes a map of the temporal variance"
+    params = {
+        "ds": {
+            "name": "Dataset",
+            "type": "String",
+            "description": "The dataset used to generate the map. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude. Required"
+        },
+        "spark": {
+            "name": "Spark Configuration",
+            "type": "comma-delimited value",
+            "description": "Configuration used to launch in the Spark cluster. 
Value should be 3 elements separated by "
+                           "commas. 1) Spark Master 2) Number of Spark 
Executors 3) Number of Spark Partitions. Only "
+                           "Number of Spark Partitions is used by this 
function. Optional (Default: local,1,1)"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        SparkHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+
+        try:
+            ds = request.get_dataset()
+            if type(ds) == list or type(ds) == tuple:
+                ds = next(iter(ds))
+        except:
+            raise NexusProcessingException(
+                reason="'ds' argument is required. Must be a string",
+                code=400)
+
+        # Do not allow time series on Climatology
+        if next(iter([clim for clim in ds if 'CLIM' in clim]), False):
+            raise NexusProcessingException(
+                reason="Cannot compute Latitude/Longitude Time Average plot on 
a climatology", code=400)
+
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+            request.get_min_lon = lambda: bounding_polygon.bounds[0]
+            request.get_min_lat = lambda: bounding_polygon.bounds[1]
+            request.get_max_lon = lambda: bounding_polygon.bounds[2]
+            request.get_max_lat = lambda: bounding_polygon.bounds[3]
+        except:
+            try:
+                west, south, east, north = request.get_min_lon(), 
request.get_min_lat(), \
+                                           request.get_max_lon(), 
request.get_max_lat()
+                bounding_polygon = shapely.geometry.Polygon(
+                    [(west, south), (east, south), (east, north), (west, 
north), (west, south)])
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument is required. Must be comma-delimited 
float formatted as "
+                           "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude",
+                    code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value 
seconds from epoch or "
+                       "string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. 
Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        nparts_requested = request.get_nparts()
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, nparts_requested
+
+    def calc(self, compute_options, **args):
+        """
+
+        :param compute_options: StatsComputeOptions
+        :param args: dict
+        :return:
+        """
+
+        ds, bbox, start_time, end_time, nparts_requested = 
self.parse_arguments(compute_options)
+        self._setQueryParams(ds,
+                             (float(bbox.bounds[1]),
+                              float(bbox.bounds[3]),
+                              float(bbox.bounds[0]),
+                              float(bbox.bounds[2])),
+                             start_time,
+                             end_time)
+
+        nexus_tiles = self._find_global_tile_set()
+
+        if len(nexus_tiles) == 0:
+            raise NoDataException(reason="No data found for selected 
timeframe")
+
+        self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+        print('Found {} tiles'.format(len(nexus_tiles)))
+
+        daysinrange = self._tile_service.find_days_in_range_asc(bbox.bounds[1],
+                                                                bbox.bounds[3],
+                                                                bbox.bounds[0],
+                                                                bbox.bounds[2],
+                                                                ds,
+                                                                start_time,
+                                                                end_time)
+        ndays = len(daysinrange)
+        if ndays == 0:
+            raise NoDataException(reason="No data found for selected 
timeframe")
+        self.log.debug('Found {0} days in range'.format(ndays))
+        for i, d in enumerate(daysinrange):
+            self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
+
+
+        self.log.debug('Using Native resolution: lat_res={0}, 
lon_res={1}'.format(self._latRes, self._lonRes))
+        self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
+        self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent,
+                                                              
self._maxLatCent))
+        self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
+                                                              
self._maxLonCent))
+
+        # Create array of tuples to pass to Spark map function
+        nexus_tiles_spark = [[self._find_tile_bounds(t),
+                              self._startTime, self._endTime,
+                              self._ds] for t in nexus_tiles]
+
+        # Remove empty tiles (should have bounds set to None)
+        bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
+        for i in np.flipud(bad_tile_inds):
+            del nexus_tiles_spark[i]
+
+        # Expand Spark map tuple array by duplicating each entry N times,
+        # where N is the number of ways we want the time dimension carved up.
+        # Set the time boundaries for each of the Spark map tuples so that
+        # every Nth element in the array gets the same time bounds.
+        max_time_parts = 72
+        num_time_parts = min(max_time_parts, ndays)
+
+        spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in 
np.array_split(np.array(daysinrange), num_time_parts)]), 
(len(nexus_tiles_spark),1))
+        nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, 
axis=0)
+        nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
+
+        # Launch Spark computations to calculate x_bar
+        spark_nparts = self._spark_nparts(nparts_requested)
+        self.log.info('Using {} partitions'.format(spark_nparts))
+
+        rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
+        sum_count_part = rdd.map(self._map)
+        sum_count = \
+            sum_count_part.combineByKey(lambda val: val,
+                                        lambda x, val: (x[0] + val[0],
+                                                        x[1] + val[1]),
+                                        lambda x, y: (x[0] + y[0], x[1] + 
y[1]))
+        fill = self._fill
+        avg_tiles = \
+            sum_count.map(lambda (bounds, (sum_tile, cnt_tile)):
+                          (bounds, [[(sum_tile[y, x] / cnt_tile[y, x])
+                          if (cnt_tile[y, x] > 0)
+                          else fill
+                                     for x in
+                                     range(sum_tile.shape[1])]
+                                    for y in
+                                    range(sum_tile.shape[0])])).collect()
+
+        #
+        # Launch a second parallel computation to calculate variance from x_bar
+        #
+
+        # Create array of tuples to pass to Spark map function - first param 
are the tile bounds that were in the
+        # results and the last param is the data for the results (x bar)
+        nexus_tiles_spark = [[t[0], self._startTime, self._endTime, self._ds, 
t[1]] for t in avg_tiles]
+
+        self.log.info('Using {} partitions'.format(spark_nparts))
+        rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
+
+        anomaly_squared_part = rdd.map(self._calc_variance)
+        anomaly_squared = \
+            anomaly_squared_part.combineByKey(lambda val: val,
+                                        lambda x, val: (x[0] + val[0],
+                                                        x[1] + val[1]),
+                                        lambda x, y: (x[0] + y[0], x[1] + 
y[1]))
+
+        variance_tiles = \
+            anomaly_squared.map(lambda (bounds, (anomaly_squared_tile, 
cnt_tile)):
+                               (bounds, [[{'variance': 
(anomaly_squared_tile[y, x] / cnt_tile[y, x])
+                               if (cnt_tile[y, x] > 0)
+                               else fill,
+                                           'cnt': cnt_tile[y, x]}
+                                       for x in 
range(anomaly_squared_tile.shape[1])]
+                                       for y in 
range(anomaly_squared_tile.shape[0])])).collect()
+
+
+
+        # Combine subset results to produce global map.
+        #
+        # The tiles below are NOT Nexus objects.  They are tuples
+        # with the time avg map data and lat-lon bounding box.
+        a = np.zeros((self._nlats, self._nlons), dtype=np.float64, order='C')
+        n = np.zeros((self._nlats, self._nlons), dtype=np.uint32, order='C')
+        for tile in variance_tiles:
+            if tile is not None:
+                ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
+                 tile_stats) = tile
+                tile_data = np.ma.array(
+                    [[tile_stats[y][x]['variance'] for x in 
range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+                tile_cnt = np.array(
+                    [[tile_stats[y][x]['cnt'] for x in 
range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+                tile_data.mask = ~(tile_cnt.astype(bool))
+                y0 = self._lat2ind(tile_min_lat)
+                y1 = y0 + tile_data.shape[0] - 1
+                x0 = self._lon2ind(tile_min_lon)
+                x1 = x0 + tile_data.shape[1] - 1
+                if np.any(np.logical_not(tile_data.mask)):
+                    self.log.debug(
+                        'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, 
map x {6}-{7}'.format(tile_min_lat,
+                                                                               
                      tile_max_lat,
+                                                                               
                      tile_min_lon,
+                                                                               
                      tile_max_lon, y0,
+                                                                               
                      y1, x0, x1))
+                    a[y0:y1 + 1, x0:x1 + 1] = tile_data
+                    n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
+                else:
+                    self.log.debug(
+                        'All pixels masked in tile lat {0}-{1}, lon {2}-{3}, 
map y {4}-{5}, map x {6}-{7}'.format(
+                            tile_min_lat, tile_max_lat,
+                            tile_min_lon, tile_max_lon,
+                            y0, y1, x0, x1))
+
+        # Store global map in a NetCDF file.
+        self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
+
+        # Create dict for JSON response
+        results = [[{'variance': a[y, x], 'cnt': int(n[y, x]),
+                     'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
+                    for x in range(a.shape[1])] for y in range(a.shape[0])]
+
+        return NexusResults(results=results, meta={}, stats=None,
+                            computeOptions=None, minLat=bbox.bounds[1],
+                            maxLat=bbox.bounds[3], minLon=bbox.bounds[0],
+                            maxLon=bbox.bounds[2], ds=ds, startTime=start_time,
+                            endTime=end_time)
+
+    @staticmethod
+    def _map(tile_in_spark):
+        # tile_in_spark is a spatial tile that corresponds to nexus tiles of 
the same area
+        tile_bounds = tile_in_spark[0]
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+        startTime = tile_in_spark[1]
+        endTime = tile_in_spark[2]
+        ds = tile_in_spark[3]
+        tile_service = NexusTileService()
+
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+
+        # hardcorded - limiting the amount of nexus tiles pulled at a time
+        days_at_a_time = 30
+
+        t_incr = 86400 * days_at_a_time
+        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+        t_start = startTime
+        while t_start <= endTime:
+            t_end = min(t_start + t_incr, endTime)
+
+            nexus_tiles = \
+                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+                                                      min_lon, max_lon,
+                                                      ds=ds,
+                                                      start_time=t_start,
+                                                      end_time=t_end)
+
+            for tile in nexus_tiles:
+                # Taking the data, converted masked nans to 0
+                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+                # Taking the opposite of the value of the bool of mask - add 0 
if it's a masked value
+                cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 
1]).astype(np.uint8)
+            t_start = t_end + 1
+
+        print("sum tile", sum_tile)
+        print("count tile", cnt_tile)
+        return tile_bounds, (sum_tile, cnt_tile)
+
+    @staticmethod
+    def _calc_variance(tile_in_spark):
+        # tile_in_spark is a spatial tile that corresponds to nexus tiles of 
the same area
+        tile_bounds = tile_in_spark[0]
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+        startTime = tile_in_spark[1]
+        endTime = tile_in_spark[2]
+        ds = tile_in_spark[3]
+        x_bar = tile_in_spark[4]
+        tile_service = NexusTileService()
+
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+
+        # hardcorded - limiting the amount of nexus tiles pulled at a time
+        days_at_a_time = 30
+
+        t_incr = 86400 * days_at_a_time
+        data_anomaly_squared_tile = np.array(np.zeros(tile_inbounds_shape, 
dtype=np.float64))
+        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+
+        x_bar = np.asarray(x_bar)
+        x_bar[:, :] = np.nan_to_num(x_bar)
+
+        t_start = startTime
+        while t_start <= endTime:
+            t_end = min(t_start + t_incr, endTime)
+
+            nexus_tiles = \
+                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+                                                      min_lon, max_lon,
+                                                      ds=ds,
+                                                      start_time=t_start,
+                                                      end_time=t_end)
+
+            for tile in nexus_tiles:
+                # Taking the data, converted masked nans to 0
+                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+
+                # subtract x_bar from each value, then square it
+                data_anomaly_tile = tile.data.data[0, min_y:max_y + 1, 
min_x:max_x + 1] - x_bar
+                data_anomaly_squared_tile += data_anomaly_tile * 
data_anomaly_tile
+
+                # Taking the opposite of the value of the bool of mask - add 0 
if it's a masked value
+                cnt_tile += (~tile.data.mask[0, min_y:max_y + 1, min_x:max_x + 
1]).astype(np.uint8)
+            t_start = t_end + 1
+
+        return (min_lat, max_lat, min_lon, max_lon), 
(data_anomaly_squared_tile, cnt_tile)
+
diff --git a/analysis/webservice/algorithms_spark/__init__.py 
b/analysis/webservice/algorithms_spark/__init__.py
index 656c949..159af8e 100644
--- a/analysis/webservice/algorithms_spark/__init__.py
+++ b/analysis/webservice/algorithms_spark/__init__.py
@@ -46,6 +46,11 @@ if module_exists("pyspark"):
         pass
 
     try:
+        import VarianceSpark
+    except ImportError:
+        pass
+
+    try:
         import TimeSeriesSpark
     except ImportError:
         pass

Reply via email to