This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch date-normalization
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit 16dd6256c7235597b813d6533a208abdf074b247
Author: Eamon Ford <[email protected]>
AuthorDate: Tue Dec 22 16:50:01 2020 -0800

    normalizeDates optoin added for timeseriesspark
---
 .../webservice/algorithms_spark/TimeSeriesSpark.py | 90 +++++-----------------
 analysis/webservice/webmodel/NexusRequestObject.py |  5 +-
 analysis/webservice/webmodel/RequestParameters.py  |  3 +-
 3 files changed, 24 insertions(+), 74 deletions(-)

diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py 
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 422fdb8..cddbba8 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -20,6 +20,7 @@ import traceback
 from cStringIO import StringIO
 from datetime import datetime
 from functools import partial
+import time
 
 import matplotlib.dates as mdates
 import matplotlib.pyplot as plt
@@ -156,8 +157,9 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
         end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
 
         nparts_requested = request.get_nparts()
+        normalize_dates = request.get_normalize_dates()
 
-        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, 
nparts_requested
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, 
nparts_requested, normalize_dates
 
     def calc(self, request, **args):
         """
@@ -167,7 +169,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
         :return:
         """
         start_time = datetime.now()
-        ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, 
nparts_requested = self.parse_arguments(
+        ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, 
nparts_requested, normalize_dates = self.parse_arguments(
             request)
         metrics_record = self._create_metrics_record()
 
@@ -199,6 +201,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
                                          shortName,
                                          self._tile_service_factory,
                                          metrics_record.record_metrics,
+                                         normalize_dates,
                                          spark_nparts=spark_nparts,
                                          sc=self._sc)
 
@@ -221,6 +224,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
                                                shortName_clim,
                                                self._tile_service_factory,
                                                metrics_record.record_metrics,
+                                               normalize_dates=False,
                                                spark_nparts=spark_nparts,
                                                sc=self._sc)
                 clim_indexed_by_month = 
{datetime.utcfromtimestamp(result['time']).month: result for result in 
results_clim}
@@ -297,73 +301,6 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
         return res
 
     @lru_cache()
-    def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, 
ds=None):
-
-        min_date, max_date = self.get_min_max_date(ds=ds)
-
-        monthly_averages, monthly_counts = [], []
-        monthly_mins, monthly_maxes = [], []
-        bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt)
-        for year in range(min_date.year, max_date.year + 1):
-            beginning_of_month = datetime(year, month, 1)
-            end_of_month = datetime(year, month, calendar.monthrange(year, 
month)[1], 23, 59, 59)
-            start = (pytz.UTC.localize(beginning_of_month) - 
EPOCH).total_seconds()
-            end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds()
-            tile_stats = 
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, ds, start, end,
-                                                                        
fl=('id,'
-                                                                            
'tile_avg_val_d,tile_count_i,'
-                                                                            
'tile_min_val_d,tile_max_val_d,'
-                                                                            
'tile_min_lat,tile_max_lat,'
-                                                                            
'tile_min_lon,tile_max_lon'),
-                                                                        
fetch_data=False)
-            if len(tile_stats) == 0:
-                continue
-
-            # Split list into tiles on the border of the bounding box and 
tiles completely inside the bounding box.
-            border_tiles, inner_tiles = [], []
-            for tile in tile_stats:
-                inner_tiles.append(tile) if 
bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon,
-                                                                               
            tile.bbox.min_lat,
-                                                                               
            tile.bbox.max_lon,
-                                                                               
            tile.bbox.max_lat)) else border_tiles.append(
-                    tile)
-
-            # We can use the stats of the inner tiles directly
-            tile_means = [tile.tile_stats.mean for tile in inner_tiles]
-            tile_mins = [tile.tile_stats.min for tile in inner_tiles]
-            tile_maxes = [tile.tile_stats.max for tile in inner_tiles]
-            tile_counts = [tile.tile_stats.count for tile in inner_tiles]
-
-            # Border tiles need have the data loaded, masked, and stats 
recalculated
-            border_tiles = 
list(self._get_tile_service().fetch_data_for_tiles(*border_tiles))
-            border_tiles = 
self._get_tile_service().mask_tiles_to_polygon(bounding_polygon, border_tiles)
-            for tile in border_tiles:
-                tile.update_stats()
-                tile_means.append(tile.tile_stats.mean)
-                tile_mins.append(tile.tile_stats.min)
-                tile_maxes.append(tile.tile_stats.max)
-                tile_counts.append(tile.tile_stats.count)
-
-            tile_means = np.array(tile_means)
-            tile_mins = np.array(tile_mins)
-            tile_maxes = np.array(tile_maxes)
-            tile_counts = np.array(tile_counts)
-
-            sum_tile_counts = np.sum(tile_counts) * 1.0
-
-            monthly_averages += [np.average(tile_means, None, tile_counts / 
sum_tile_counts).item()]
-            monthly_mins += [np.average(tile_mins, None, tile_counts / 
sum_tile_counts).item()]
-            monthly_maxes += [np.average(tile_maxes, None, tile_counts / 
sum_tile_counts).item()]
-            monthly_counts += [sum_tile_counts]
-
-        count_sum = np.sum(monthly_counts) * 1.0
-        weights = np.array(monthly_counts) / count_sum
-
-        return np.average(monthly_averages, None, weights).item(), \
-            np.average(monthly_averages, None, weights).item(), \
-            np.average(monthly_averages, None, weights).item()
-
-    @lru_cache()
     def get_min_max_date(self, ds=None):
         min_date = pytz.timezone('UTC').localize(
             
datetime.utcfromtimestamp(self._get_tile_service().get_min_time([], ds=ds)))
@@ -512,7 +449,7 @@ class TimeSeriesResults(NexusResults):
         return sio.getvalue()
 
 
-def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, 
metrics_callback, fill=-9999.,
+def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, 
metrics_callback, normalize_dates, fill=-9999.,
                  spark_nparts=1, sc=None):
     nexus_tiles_spark = [(bounding_polygon.wkt, ds,
                           list(daysinrange_part), fill)
@@ -522,14 +459,20 @@ def spark_driver(daysinrange, bounding_polygon, ds, 
tile_service_factory, metric
     # Launch Spark computations
     rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
     metrics_callback(partitions=rdd.getNumPartitions())
-    results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, 
metrics_callback)).collect()
+    results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, 
metrics_callback, normalize_dates)).collect()
     results = list(itertools.chain.from_iterable(results))
     results = sorted(results, key=lambda entry: entry["time"])
 
     return results, {}
 
 
-def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark):
+def normalize_date(time_in_seconds):
+    dt = datetime.utcfromtimestamp(time_in_seconds)
+    normalized_dt = dt.replace(day=1)
+    return int(time.mktime(normalized_dt.timetuple()))
+
+
+def calc_average_on_day(tile_service_factory, metrics_callback, 
normalize_dates, tile_in_spark):
     import shapely.wkt
     from datetime import datetime
     from pytz import timezone
@@ -588,6 +531,9 @@ def calc_average_on_day(tile_service_factory, 
metrics_callback, tile_in_spark):
             data_std = np.ma.std(tile_data_agg)
 
         # Return Stats by day
+        if normalize_dates:
+            timeinseconds = normalize_date(timeinseconds)
+
         stat = {
             'min': data_min,
             'max': data_max,
diff --git a/analysis/webservice/webmodel/NexusRequestObject.py 
b/analysis/webservice/webmodel/NexusRequestObject.py
index f118484..3779cf9 100644
--- a/analysis/webservice/webmodel/NexusRequestObject.py
+++ b/analysis/webservice/webmodel/NexusRequestObject.py
@@ -224,4 +224,7 @@ class NexusRequestObject(StatsComputeOptions):
         return self.get_argument(RequestParameters.PLOT_TYPE, default=default)
 
     def get_nparts(self):
-        return self.get_int_arg(RequestParameters.NPARTS, 0)
\ No newline at end of file
+        return self.get_int_arg(RequestParameters.NPARTS, 0)
+
+    def get_normalize_dates(self):
+        return self.get_boolean_arg(RequestParameters.NORMALIZE_DATES, False)
diff --git a/analysis/webservice/webmodel/RequestParameters.py 
b/analysis/webservice/webmodel/RequestParameters.py
index b043cbe..2fdfa29 100644
--- a/analysis/webservice/webmodel/RequestParameters.py
+++ b/analysis/webservice/webmodel/RequestParameters.py
@@ -20,4 +20,5 @@ class RequestParameters(object):
     PLOT_SERIES = "plotSeries"
     PLOT_TYPE = "plotType"
     NPARTS = "nparts"
-    METADATA_FILTER = "metadataFilter"
\ No newline at end of file
+    METADATA_FILTER = "metadataFilter"
+    NORMALIZE_DATES = "normalizeDates"

Reply via email to