This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch date-normalization in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 16dd6256c7235597b813d6533a208abdf074b247 Author: Eamon Ford <[email protected]> AuthorDate: Tue Dec 22 16:50:01 2020 -0800 normalizeDates optoin added for timeseriesspark --- .../webservice/algorithms_spark/TimeSeriesSpark.py | 90 +++++----------------- analysis/webservice/webmodel/NexusRequestObject.py | 5 +- analysis/webservice/webmodel/RequestParameters.py | 3 +- 3 files changed, 24 insertions(+), 74 deletions(-) diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py index 422fdb8..cddbba8 100644 --- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py +++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py @@ -20,6 +20,7 @@ import traceback from cStringIO import StringIO from datetime import datetime from functools import partial +import time import matplotlib.dates as mdates import matplotlib.pyplot as plt @@ -156,8 +157,9 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler): end_seconds_from_epoch = long((end_time - EPOCH).total_seconds()) nparts_requested = request.get_nparts() + normalize_dates = request.get_normalize_dates() - return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested + return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested, normalize_dates def calc(self, request, **args): """ @@ -167,7 +169,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler): :return: """ start_time = datetime.now() - ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested = self.parse_arguments( + ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, nparts_requested, normalize_dates = self.parse_arguments( request) metrics_record = self._create_metrics_record() @@ -199,6 +201,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler): shortName, self._tile_service_factory, metrics_record.record_metrics, + normalize_dates, spark_nparts=spark_nparts, sc=self._sc) @@ -221,6 +224,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler): shortName_clim, self._tile_service_factory, metrics_record.record_metrics, + normalize_dates=False, spark_nparts=spark_nparts, sc=self._sc) clim_indexed_by_month = {datetime.utcfromtimestamp(result['time']).month: result for result in results_clim} @@ -297,73 +301,6 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler): return res @lru_cache() - def calculate_monthly_average(self, month=None, bounding_polygon_wkt=None, ds=None): - - min_date, max_date = self.get_min_max_date(ds=ds) - - monthly_averages, monthly_counts = [], [] - monthly_mins, monthly_maxes = [], [] - bounding_polygon = shapely.wkt.loads(bounding_polygon_wkt) - for year in range(min_date.year, max_date.year + 1): - beginning_of_month = datetime(year, month, 1) - end_of_month = datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59) - start = (pytz.UTC.localize(beginning_of_month) - EPOCH).total_seconds() - end = (pytz.UTC.localize(end_of_month) - EPOCH).total_seconds() - tile_stats = self._get_tile_service().find_tiles_in_polygon(bounding_polygon, ds, start, end, - fl=('id,' - 'tile_avg_val_d,tile_count_i,' - 'tile_min_val_d,tile_max_val_d,' - 'tile_min_lat,tile_max_lat,' - 'tile_min_lon,tile_max_lon'), - fetch_data=False) - if len(tile_stats) == 0: - continue - - # Split list into tiles on the border of the bounding box and tiles completely inside the bounding box. - border_tiles, inner_tiles = [], [] - for tile in tile_stats: - inner_tiles.append(tile) if bounding_polygon.contains(shapely.geometry.box(tile.bbox.min_lon, - tile.bbox.min_lat, - tile.bbox.max_lon, - tile.bbox.max_lat)) else border_tiles.append( - tile) - - # We can use the stats of the inner tiles directly - tile_means = [tile.tile_stats.mean for tile in inner_tiles] - tile_mins = [tile.tile_stats.min for tile in inner_tiles] - tile_maxes = [tile.tile_stats.max for tile in inner_tiles] - tile_counts = [tile.tile_stats.count for tile in inner_tiles] - - # Border tiles need have the data loaded, masked, and stats recalculated - border_tiles = list(self._get_tile_service().fetch_data_for_tiles(*border_tiles)) - border_tiles = self._get_tile_service().mask_tiles_to_polygon(bounding_polygon, border_tiles) - for tile in border_tiles: - tile.update_stats() - tile_means.append(tile.tile_stats.mean) - tile_mins.append(tile.tile_stats.min) - tile_maxes.append(tile.tile_stats.max) - tile_counts.append(tile.tile_stats.count) - - tile_means = np.array(tile_means) - tile_mins = np.array(tile_mins) - tile_maxes = np.array(tile_maxes) - tile_counts = np.array(tile_counts) - - sum_tile_counts = np.sum(tile_counts) * 1.0 - - monthly_averages += [np.average(tile_means, None, tile_counts / sum_tile_counts).item()] - monthly_mins += [np.average(tile_mins, None, tile_counts / sum_tile_counts).item()] - monthly_maxes += [np.average(tile_maxes, None, tile_counts / sum_tile_counts).item()] - monthly_counts += [sum_tile_counts] - - count_sum = np.sum(monthly_counts) * 1.0 - weights = np.array(monthly_counts) / count_sum - - return np.average(monthly_averages, None, weights).item(), \ - np.average(monthly_averages, None, weights).item(), \ - np.average(monthly_averages, None, weights).item() - - @lru_cache() def get_min_max_date(self, ds=None): min_date = pytz.timezone('UTC').localize( datetime.utcfromtimestamp(self._get_tile_service().get_min_time([], ds=ds))) @@ -512,7 +449,7 @@ class TimeSeriesResults(NexusResults): return sio.getvalue() -def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, fill=-9999., +def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, normalize_dates, fill=-9999., spark_nparts=1, sc=None): nexus_tiles_spark = [(bounding_polygon.wkt, ds, list(daysinrange_part), fill) @@ -522,14 +459,20 @@ def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metric # Launch Spark computations rdd = sc.parallelize(nexus_tiles_spark, spark_nparts) metrics_callback(partitions=rdd.getNumPartitions()) - results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback)).collect() + results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback, normalize_dates)).collect() results = list(itertools.chain.from_iterable(results)) results = sorted(results, key=lambda entry: entry["time"]) return results, {} -def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark): +def normalize_date(time_in_seconds): + dt = datetime.utcfromtimestamp(time_in_seconds) + normalized_dt = dt.replace(day=1) + return int(time.mktime(normalized_dt.timetuple())) + + +def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates, tile_in_spark): import shapely.wkt from datetime import datetime from pytz import timezone @@ -588,6 +531,9 @@ def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark): data_std = np.ma.std(tile_data_agg) # Return Stats by day + if normalize_dates: + timeinseconds = normalize_date(timeinseconds) + stat = { 'min': data_min, 'max': data_max, diff --git a/analysis/webservice/webmodel/NexusRequestObject.py b/analysis/webservice/webmodel/NexusRequestObject.py index f118484..3779cf9 100644 --- a/analysis/webservice/webmodel/NexusRequestObject.py +++ b/analysis/webservice/webmodel/NexusRequestObject.py @@ -224,4 +224,7 @@ class NexusRequestObject(StatsComputeOptions): return self.get_argument(RequestParameters.PLOT_TYPE, default=default) def get_nparts(self): - return self.get_int_arg(RequestParameters.NPARTS, 0) \ No newline at end of file + return self.get_int_arg(RequestParameters.NPARTS, 0) + + def get_normalize_dates(self): + return self.get_boolean_arg(RequestParameters.NORMALIZE_DATES, False) diff --git a/analysis/webservice/webmodel/RequestParameters.py b/analysis/webservice/webmodel/RequestParameters.py index b043cbe..2fdfa29 100644 --- a/analysis/webservice/webmodel/RequestParameters.py +++ b/analysis/webservice/webmodel/RequestParameters.py @@ -20,4 +20,5 @@ class RequestParameters(object): PLOT_SERIES = "plotSeries" PLOT_TYPE = "plotType" NPARTS = "nparts" - METADATA_FILTER = "metadataFilter" \ No newline at end of file + METADATA_FILTER = "metadataFilter" + NORMALIZE_DATES = "normalizeDates"
