This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new cf2149b  SDAP-258: Use pre-computed climatologies for deseason 
algorithm (#109)
cf2149b is described below

commit cf2149b6d88fb521eb473f983b7780979dfa5c2a
Author: Eamon Ford <[email protected]>
AuthorDate: Tue Oct 13 15:01:10 2020 -0700

    SDAP-258: Use pre-computed climatologies for deseason algorithm (#109)
---
 .../webservice/algorithms_spark/TimeSeriesSpark.py | 72 ++++++++++++++--------
 1 file changed, 47 insertions(+), 25 deletions(-)

diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py 
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 43f7f6d..422fdb8 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -38,6 +38,7 @@ from webservice.webmodel import NexusResults, 
NoDataException, NexusProcessingEx
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
 ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+SECONDS_IN_ONE_YEAR = 31535999
 
 logger = logging.getLogger(__name__)
 
@@ -117,7 +118,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
         except:
             try:
                 west, south, east, north = request.get_min_lon(), 
request.get_min_lat(), \
-                                           request.get_max_lon(), 
request.get_max_lat()
+                    request.get_max_lon(), request.get_max_lat()
                 bounding_polygon = shapely.geometry.Polygon(
                     [(west, south), (east, south), (east, north), (west, 
north), (west, south)])
             except:
@@ -160,7 +161,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
 
     def calc(self, request, **args):
         """
-    
+
         :param request: StatsComputeOptions
         :param args: dict
         :return:
@@ -176,13 +177,13 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
 
             the_time = datetime.now()
             daysinrange = 
self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
-                                                                    
bounding_polygon.bounds[3],
-                                                                    
bounding_polygon.bounds[0],
-                                                                    
bounding_polygon.bounds[2],
-                                                                    shortName,
-                                                                    
start_seconds_from_epoch,
-                                                                    
end_seconds_from_epoch,
-                                                                    
metrics_callback=metrics_record.record_metrics)
+                                                                          
bounding_polygon.bounds[3],
+                                                                          
bounding_polygon.bounds[0],
+                                                                          
bounding_polygon.bounds[2],
+                                                                          
shortName,
+                                                                          
start_seconds_from_epoch,
+                                                                          
end_seconds_from_epoch,
+                                                                          
metrics_callback=metrics_record.record_metrics)
             self.log.info("Finding days in range took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
 
             ndays = len(daysinrange)
@@ -203,16 +204,37 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
 
             if apply_seasonal_cycle_filter:
                 the_time = datetime.now()
+                # get time series for _clim dataset
+                shortName_clim = shortName + "_clim"
+                daysinrange_clim = 
self._get_tile_service().find_days_in_range_asc(bounding_polygon.bounds[1],
+                                                                               
    bounding_polygon.bounds[3],
+                                                                               
    bounding_polygon.bounds[0],
+                                                                               
    bounding_polygon.bounds[2],
+                                                                               
    shortName_clim,
+                                                                               
    0,
+                                                                               
    SECONDS_IN_ONE_YEAR,
+                                                                               
    metrics_callback=metrics_record.record_metrics)
+                if len(daysinrange_clim) == 0:
+                    raise NexusProcessingException(reason="There is no 
climatology data present for dataset " + shortName + ".") 
+                results_clim, _ = spark_driver(daysinrange_clim,
+                                               bounding_polygon,
+                                               shortName_clim,
+                                               self._tile_service_factory,
+                                               metrics_record.record_metrics,
+                                               spark_nparts=spark_nparts,
+                                               sc=self._sc)
+                clim_indexed_by_month = 
{datetime.utcfromtimestamp(result['time']).month: result for result in 
results_clim}
+                if len(clim_indexed_by_month) < 12:
+                    raise NexusProcessingException(reason="There are only " +
+                                                   len(clim_indexed_by_month) 
+ " months of climatology data for dataset " + 
+                                                   shortName + ". A full year 
of climatology data is required for computing deseasoned timeseries.")
+
                 for result in results:
                     month = datetime.utcfromtimestamp(result['time']).month
-                    month_mean, month_max, month_min = 
self.calculate_monthly_average(month, bounding_polygon.wkt,
-                                                                               
       shortName)
-                    seasonal_mean = result['mean'] - month_mean
-                    seasonal_min = result['min'] - month_min
-                    seasonal_max = result['max'] - month_max
-                    result['meanSeasonal'] = seasonal_mean
-                    result['minSeasonal'] = seasonal_min
-                    result['maxSeasonal'] = seasonal_max
+
+                    result['meanSeasonal'] = result['mean'] - 
clim_indexed_by_month[month]['mean']
+                    result['minSeasonal'] = result['min'] - 
clim_indexed_by_month[month]['min']
+                    result['maxSeasonal'] = result['max'] - 
clim_indexed_by_month[month]['max']
                 self.log.info(
                     "Seasonal calculation took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
 
@@ -288,12 +310,12 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
             start = (pytz.UTC.localize(beginning_of_month) - 
EPOCH).total_seconds()
             end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds()
             tile_stats = 
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, ds, start, end,
-                                                                  fl=('id,'
-                                                                      
'tile_avg_val_d,tile_count_i,'
-                                                                      
'tile_min_val_d,tile_max_val_d,'
-                                                                      
'tile_min_lat,tile_max_lat,'
-                                                                      
'tile_min_lon,tile_max_lon'),
-                                                                  
fetch_data=False)
+                                                                        
fl=('id,'
+                                                                            
'tile_avg_val_d,tile_count_i,'
+                                                                            
'tile_min_val_d,tile_max_val_d,'
+                                                                            
'tile_min_lat,tile_max_lat,'
+                                                                            
'tile_min_lon,tile_max_lon'),
+                                                                        
fetch_data=False)
             if len(tile_stats) == 0:
                 continue
 
@@ -338,8 +360,8 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
         weights = np.array(monthly_counts) / count_sum
 
         return np.average(monthly_averages, None, weights).item(), \
-               np.average(monthly_averages, None, weights).item(), \
-               np.average(monthly_averages, None, weights).item()
+            np.average(monthly_averages, None, weights).item(), \
+            np.average(monthly_averages, None, weights).item()
 
     @lru_cache()
     def get_min_max_date(self, ds=None):

Reply via email to