http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py new file mode 100644 index 0000000..1f31267 --- /dev/null +++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py @@ -0,0 +1,391 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import logging +from datetime import datetime + +import numpy as np +import pytz +from nexustiles.nexustiles import NexusTileService +from shapely import wkt +from shapely.geometry import Polygon + +from webservice.NexusHandler import nexus_handler, SparkHandler +from webservice.webmodel import NexusResults, NexusProcessingException + +SENTINEL = 'STOP' + +EPOCH = pytz.timezone('UTC').localize(datetime(1970, 1, 1)) + + +def iso_time_to_epoch(str_time): + return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=pytz.UTC) - EPOCH).total_seconds() + + +@nexus_handler +class DailyDifferenceAverageSparkImpl(SparkHandler): + name = "Daily Difference Average Spark" + path = "/dailydifferenceaverage_spark" + description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day." + params = { + "dataset": { + "name": "Dataset", + "type": "string", + "description": "The Dataset shortname to use in calculation" + }, + "climatology": { + "name": "Climatology", + "type": "string", + "description": "The Climatology shortname to use in calculation" + }, + "b": { + "name": "Bounding box", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ" + } + } + singleton = True + + def __init__(self): + SparkHandler.__init__(self, skipCassandra=True) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + try: + bounding_polygon = request.get_bounding_polygon() + except: + try: + minLat = request.get_min_lat() + maxLat = request.get_max_lat() + minLon = request.get_min_lon() + maxLon = request.get_max_lon() + bounding_polygon = Polygon([(minLon, minLat), # (west, south) + (maxLon, minLat), # (east, south) + (maxLon, maxLat), # (east, north) + (minLon, maxLat), # (west, north) + (minLon, minLat)]) # (west, south) + except: + raise NexusProcessingException( + reason="'b' argument or 'minLon', 'minLat', 'maxLon', and 'maxLat' arguments are required. If 'b' is used, it must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + dataset = request.get_argument('dataset', None) + if dataset is None: + dataset = request.get_argument('ds1', None) + if dataset is None: + raise NexusProcessingException(reason="'dataset' or 'ds1' argument is required", code=400) + climatology = request.get_argument('climatology', None) + if climatology is None: + climatology = request.get_argument('ds2', None) + if climatology is None: + raise NexusProcessingException(reason="'climatology' or 'ds2' argument is required", code=400) + + try: + start_time = request.get_start_datetime() + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + start_seconds_from_epoch = long((start_time - EPOCH).total_seconds()) + end_seconds_from_epoch = long((end_time - EPOCH).total_seconds()) + + plot = request.get_boolean_arg("plot", default=False) + + return bounding_polygon, dataset, climatology, start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot + + def calc(self, request, **args): + bounding_polygon, dataset, climatology, start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot = self.parse_arguments( + request) + + self.log.debug("Querying for tiles in search domain") + # Get tile ids in box + tile_ids = [tile.tile_id for tile in + self._tile_service.find_tiles_in_polygon(bounding_polygon, dataset, + start_seconds_from_epoch, end_seconds_from_epoch, + fetch_data=False, fl='id', + sort=['tile_min_time_dt asc', 'tile_min_lon asc', + 'tile_min_lat asc'], rows=5000)] + + # Call spark_matchup + self.log.debug("Calling Spark Driver") + try: + spark_result = spark_anomolies_driver(tile_ids, wkt.dumps(bounding_polygon), dataset, climatology, + sc=self._sc) + except Exception as e: + self.log.exception(e) + raise NexusProcessingException(reason="An unknown error occurred while computing average differences", + code=500) + + average_and_std_by_day = spark_result + + result = DDAResult( + results=[[{'time': dayms, 'mean': avg_std[0], 'std': avg_std[1], 'ds': 0}] for dayms, avg_std in + average_and_std_by_day], + stats={}, + meta=self.get_meta(dataset)) + + min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds + result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time) + return result + + def get_meta(self, dataset): + + # TODO yea this is broken + if 'sst' in dataset.lower(): + meta = { + "title": "Sea Surface Temperature (SST) Anomalies", + "description": "SST anomalies are departures from the 5-day pixel mean", + "units": u'\u00B0C', + "label": u'Difference from 5-Day mean (\u00B0C)' + } + elif 'chl' in dataset.lower(): + meta = { + "title": "Chlorophyll Concentration Anomalies", + "description": "Chlorophyll Concentration anomalies are departures from the 5-day pixel mean", + "units": u'mg m^-3', + "label": u'Difference from 5-Day mean (mg m^-3)' + } + elif 'sla' in dataset.lower(): + meta = { + "title": "Sea Level Anomaly Estimate (SLA) Anomalies", + "description": "SLA anomalies are departures from the 5-day pixel mean", + "units": u'm', + "label": u'Difference from 5-Day mean (m)' + } + elif 'sss' in dataset.lower(): + meta = { + "title": "Sea Surface Salinity (SSS) Anomalies", + "description": "SSS anomalies are departures from the 5-day pixel mean", + "units": u'psu', + "label": u'Difference from 5-Day mean (psu)' + } + elif 'ccmp' in dataset.lower(): + meta = { + "title": "Wind Speed Anomalies", + "description": "Wind Speed anomalies are departures from the 1-day pixel mean", + "units": u'm/s', + "label": u'Difference from 1-Day mean (m/s)' + } + elif 'trmm' in dataset.lower(): + meta = { + "title": "Precipitation Anomalies", + "description": "Precipitation anomalies are departures from the 5-day pixel mean", + "units": u'mm', + "label": u'Difference from 5-Day mean (mm)' + } + else: + meta = { + "title": "Anomalies", + "description": "Anomalies are departures from the 5-day pixel mean", + "units": u'', + "label": u'Difference from 5-Day mean' + } + return meta + + +class DDAResult(NexusResults): + def toImage(self): + from StringIO import StringIO + import matplotlib.pyplot as plt + from matplotlib.dates import date2num + + times = [date2num(datetime.fromtimestamp(dayavglistdict[0]['time'], pytz.utc).date()) for dayavglistdict in + self.results()] + means = [dayavglistdict[0]['mean'] for dayavglistdict in self.results()] + plt.plot_date(times, means, '|g-') + + plt.xlabel('Date') + plt.xticks(rotation=70) + plt.ylabel(u'Difference from 5-Day mean (\u00B0C)') + plt.title('Sea Surface Temperature (SST) Anomalies') + plt.grid(True) + plt.tight_layout() + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + +from threading import Lock +from shapely.geometry import box + +DRIVER_LOCK = Lock() + + +class NoClimatologyTile(Exception): + pass + + +class NoDatasetTile(Exception): + pass + + +def determine_parllelism(num_tiles): + """ + Try to stay at a maximum of 1500 tiles per partition; But don't go over 128 partitions. + Also, don't go below the default of 8 + """ + num_partitions = max(min(num_tiles // 1500, 128), 8) + return num_partitions + + +def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None): + from functools import partial + + with DRIVER_LOCK: + bounding_wkt_b = sc.broadcast(bounding_wkt) + dataset_b = sc.broadcast(dataset) + climatology_b = sc.broadcast(climatology) + + # Parallelize list of tile ids + rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids))) + + def add_tuple_elements(tuple1, tuple2): + cumulative_sum = tuple1[0] + tuple2[0] + cumulative_count = tuple1[1] + tuple2[1] + + avg_1 = tuple1[0] / tuple1[1] + avg_2 = tuple2[0] / tuple2[1] + + cumulative_var = parallel_variance(avg_1, tuple1[1], tuple1[2], avg_2, tuple2[1], tuple2[2]) + return cumulative_sum, cumulative_count, cumulative_var + + def parallel_variance(avg_a, count_a, var_a, avg_b, count_b, var_b): + # Thanks Wikipedia https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm + delta = avg_b - avg_a + m_a = var_a * (count_a - 1) + m_b = var_b * (count_b - 1) + M2 = m_a + m_b + delta ** 2 * count_a * count_b / (count_a + count_b) + return M2 / (count_a + count_b - 1) + + def compute_avg_and_std(sum_cnt_var_tuple): + return sum_cnt_var_tuple[0] / sum_cnt_var_tuple[1], np.sqrt(sum_cnt_var_tuple[2]) + + result = rdd \ + .mapPartitions(partial(calculate_diff, bounding_wkt=bounding_wkt_b, dataset=dataset_b, + climatology=climatology_b)) \ + .reduceByKey(add_tuple_elements) \ + .mapValues(compute_avg_and_std) \ + .sortByKey() \ + .collect() + + return result + + +def calculate_diff(tile_ids, bounding_wkt, dataset, climatology): + from itertools import chain + + # Construct a list of generators that yield (day, sum, count, variance) + diff_generators = [] + + tile_ids = list(tile_ids) + if len(tile_ids) == 0: + return [] + tile_service = NexusTileService() + + for tile_id in tile_ids: + # Get the dataset tile + try: + dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id) + except NoDatasetTile: + # This should only happen if all measurements in a tile become masked after applying the bounding polygon + continue + + tile_day_of_year = dataset_tile.min_time.timetuple().tm_yday + + # Get the climatology tile + try: + climatology_tile = get_climatology_tile(tile_service, wkt.loads(bounding_wkt.value), + box(dataset_tile.bbox.min_lon, + dataset_tile.bbox.min_lat, + dataset_tile.bbox.max_lon, + dataset_tile.bbox.max_lat), + climatology.value, + tile_day_of_year) + except NoClimatologyTile: + continue + + diff_generators.append(generate_diff(dataset_tile, climatology_tile)) + + return chain(*diff_generators) + + +def get_dataset_tile(tile_service, search_bounding_shape, tile_id): + the_time = datetime.now() + + try: + # Load the dataset tile + dataset_tile = tile_service.find_tile_by_id(tile_id)[0] + # Mask it to the search domain + dataset_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [dataset_tile])[0] + except IndexError: + raise NoDatasetTile() + + print "%s Time to load dataset tile %s" % (str(datetime.now() - the_time), dataset_tile.tile_id) + return dataset_tile + + +def get_climatology_tile(tile_service, search_bounding_shape, tile_bounding_shape, climatology_dataset, + tile_day_of_year): + the_time = datetime.now() + try: + # Load the tile + climatology_tile = tile_service.find_tile_by_polygon_and_most_recent_day_of_year( + tile_bounding_shape, + climatology_dataset, + tile_day_of_year)[0] + + except IndexError: + raise NoClimatologyTile() + + if search_bounding_shape.contains(tile_bounding_shape): + # The tile is totally contained in the search area, we don't need to mask it. + pass + else: + # The tile is not totally contained in the search area, + # we need to mask the data to the search domain. + try: + # Mask it to the search domain + climatology_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [climatology_tile])[0] + except IndexError: + raise NoClimatologyTile() + + print "%s Time to load climatology tile %s" % (str(datetime.now() - the_time), climatology_tile.tile_id) + return climatology_tile + + +def generate_diff(data_tile, climatology_tile): + the_time = datetime.now() + + diff = np.subtract(data_tile.data, climatology_tile.data) + diff_sum = np.nansum(diff) + diff_var = np.nanvar(diff) + diff_ct = np.ma.count(diff) + + date_in_seconds = int((datetime.combine(data_tile.min_time.date(), datetime.min.time()).replace( + tzinfo=pytz.UTC) - EPOCH).total_seconds()) + + print "%s Time to generate diff between %s and %s" % ( + str(datetime.now() - the_time), data_tile.tile_id, climatology_tile.tile_id) + + yield (date_in_seconds, (diff_sum, diff_ct, diff_var))
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/HofMoellerSpark.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py new file mode 100644 index 0000000..f6a0118 --- /dev/null +++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py @@ -0,0 +1,331 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import itertools +import logging +import traceback +from cStringIO import StringIO +from datetime import datetime +from multiprocessing.dummy import Pool, Manager + +import matplotlib.pyplot as plt +import mpld3 +import numpy as np +from nexustiles.nexustiles import NexusTileService +from matplotlib import cm +from matplotlib.ticker import FuncFormatter + +from webservice.NexusHandler import SparkHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusProcessingException, NexusResults + +SENTINEL = 'STOP' +LATITUDE = 0 +LONGITUDE = 1 + + +class LongitudeHofMoellerCalculator(object): + @staticmethod + def longitude_time_hofmoeller_stats(tile_in_spark): + + (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark + + tile_service = NexusTileService() + try: + # Load the dataset tile + tile = tile_service.find_tile_by_id(tile_id)[0] + # Mask it to the search domain + tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0] + except IndexError: + return None + + stat = { + 'sequence': index, + 'time': np.ma.min(tile.times), + 'lons': [] + } + points = list(tile.nexus_point_generator()) + data = sorted(points, key=lambda p: p.longitude) + points_by_lon = itertools.groupby(data, key=lambda p: p.longitude) + + for lon, points_at_lon in points_by_lon: + values_at_lon = np.array([point.data_val for point in points_at_lon]) + stat['lons'].append({ + 'longitude': float(lon), + 'cnt': len(values_at_lon), + 'avg': np.mean(values_at_lon).item(), + 'max': np.max(values_at_lon).item(), + 'min': np.min(values_at_lon).item(), + 'std': np.std(values_at_lon).item() + }) + + return stat + + +class LatitudeHofMoellerCalculator(object): + @staticmethod + def latitude_time_hofmoeller_stats(tile_in_spark): + + (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark + + tile_service = NexusTileService() + try: + # Load the dataset tile + tile = tile_service.find_tile_by_id(tile_id)[0] + # Mask it to the search domain + tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0] + except IndexError: + return None + + stat = { + 'sequence': index, + 'time': np.ma.min(tile.times), + 'lats': [] + } + + points = list(tile.nexus_point_generator()) + data = sorted(points, key=lambda p: p.latitude) + points_by_lat = itertools.groupby(data, key=lambda p: p.latitude) + + for lat, points_at_lat in points_by_lat: + values_at_lat = np.array([point.data_val for point in points_at_lat]) + + stat['lats'].append({ + 'latitude': float(lat), + 'cnt': len(values_at_lat), + 'avg': np.mean(values_at_lat).item(), + 'max': np.max(values_at_lat).item(), + 'min': np.min(values_at_lat).item(), + 'std': np.std(values_at_lat).item() + }) + + return stat + + +class BaseHoffMoellerHandlerImpl(SparkHandler): + def __init__(self): + SparkHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True): + shape = (len(results), len(results[0][pivot])) + if shape[0] <= 12: + return results + for a in range(0, 12): + values = [] + for b in range(a, len(results), 12): + values.append(np.average([l[field] for l in results[b][pivot]])) + avg = np.average(values) + + for b in range(a, len(results), 12): + for l in results[b][pivot]: + l["%sSeasonal" % field] = l[field] - avg + + return results + + def applyDeseasonToHofMoeller(self, results, pivot="lats", append=True): + results = self.applyDeseasonToHofMoellerByField(results, pivot, field="avg", append=append) + results = self.applyDeseasonToHofMoellerByField(results, pivot, field="min", append=append) + results = self.applyDeseasonToHofMoellerByField(results, pivot, field="max", append=append) + return results + +def determine_parllelism(num_tiles): + """ + Try to stay at a maximum of 1500 tiles per partition; But don't go over 128 partitions. + Also, don't go below the default of 8 + """ + num_partitions = max(min(num_tiles // 1500, 128), 8) + return num_partitions + +@nexus_handler +class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl): + name = "Latitude/Time HofMoeller Spark" + path = "/latitudeTimeHofMoellerSpark" + description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + BaseHoffMoellerHandlerImpl.__init__(self) + + def calc(self, computeOptions, **args): + nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), + computeOptions.get_min_lon(), computeOptions.get_max_lon(), + computeOptions.get_dataset()[0], + computeOptions.get_start_time(), + computeOptions.get_end_time(), fetch_data=False))] + + if len(nexus_tiles_spark) == 0: + raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe") + + # Parallelize list of tile ids + rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark))) + results = rdd.map(LatitudeHofMoellerCalculator.latitude_time_hofmoeller_stats).collect() + + results = filter(None, results) + results = sorted(results, key=lambda entry: entry["time"]) + + results = self.applyDeseasonToHofMoeller(results) + + result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE) + return result + + +@nexus_handler +class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl): + name = "Longitude/Time HofMoeller Spark" + path = "/longitudeTimeHofMoellerSpark" + description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + BaseHoffMoellerHandlerImpl.__init__(self) + + def calc(self, computeOptions, **args): + nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), + computeOptions.get_min_lon(), computeOptions.get_max_lon(), + computeOptions.get_dataset()[0], + computeOptions.get_start_time(), + computeOptions.get_end_time(), fetch_data=False))] + + if len(nexus_tiles_spark) == 0: + raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe") + + # Parallelize list of tile ids + rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark))) + results = rdd.map(LongitudeHofMoellerCalculator.longitude_time_hofmoeller_stats).collect() + + results = filter(None, results) + results = sorted(results, key=lambda entry: entry["time"]) + + results = self.applyDeseasonToHofMoeller(results, pivot="lons") + + result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE) + return result + + +class HoffMoellerResults(NexusResults): + LATITUDE = 0 + LONGITUDE = 1 + + def __init__(self, results=None, meta=None, stats=None, computeOptions=None, **args): + NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions) + self.__type = args['type'] + + def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'): + cmap = cm.coolwarm + # ls = LightSource(315, 45) + # rgb = ls.shade(data, cmap) + + fig, ax = plt.subplots() + fig.set_size_inches(11.0, 8.5) + cax = ax.imshow(data, interpolation=interpolate, cmap=cmap) + + def yFormatter(y, pos): + if y < len(coordSeries): + return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.) + else: + return "" + + def xFormatter(x, pos): + if x < len(timeSeries): + return timeSeries[int(x)].strftime('%b %Y') + else: + return "" + + ax.xaxis.set_major_formatter(FuncFormatter(xFormatter)) + ax.yaxis.set_major_formatter(FuncFormatter(yFormatter)) + + ax.set_title(title) + ax.set_ylabel(coordName) + ax.set_xlabel('Date') + + fig.colorbar(cax) + fig.autofmt_xdate() + + labels = ['point {0}'.format(i + 1) for i in range(len(data))] + # plugins.connect(fig, plugins.MousePosition(fontsize=14)) + tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels) + + sio = StringIO() + plt.savefig(sio, format='png') + return sio.getvalue() + + def createLongitudeHoffmueller(self, res, meta): + lonSeries = [m['longitude'] for m in res[0]['lons']] + timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res] + + data = np.zeros((len(lonSeries), len(timeSeries))) + + plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "avg" + + for t in range(0, len(timeSeries)): + timeSet = res[t] + for l in range(0, len(lonSeries)): + latSet = timeSet['lons'][l] + value = latSet[plotSeries] + data[len(lonSeries) - l - 1][t] = value + + title = meta['title'] + source = meta['source'] + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + return self.createHoffmueller(data, lonSeries, timeSeries, "Longitude", + "%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest') + + def createLatitudeHoffmueller(self, res, meta): + latSeries = [m['latitude'] for m in res[0]['lats']] + timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res] + + data = np.zeros((len(latSeries), len(timeSeries))) + + plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None + if plotSeries is None: + plotSeries = "avg" + + for t in range(0, len(timeSeries)): + timeSet = res[t] + for l in range(0, len(latSeries)): + latSet = timeSet['lats'][l] + value = latSet[plotSeries] + data[len(latSeries) - l - 1][t] = value + + title = meta['title'] + source = meta['source'] + dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y')) + + return self.createHoffmueller(data, latSeries, timeSeries, "Latitude", + title="%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest') + + def toImage(self): + res = self.results() + meta = self.meta() + + if self.__type == HoffMoellerResults.LATITUDE: + return self.createLatitudeHoffmueller(res, meta) + elif self.__type == HoffMoellerResults.LONGITUDE: + return self.createLongitudeHoffmueller(res, meta) + else: + raise Exception("Unsupported HoffMoeller Plot Type") + + +def pool_worker(type, work_queue, done_queue): + try: + + if type == LATITUDE: + calculator = LatitudeHofMoellerCalculator() + elif type == LONGITUDE: + calculator = LongitudeHofMoellerCalculator() + + for work in iter(work_queue.get, SENTINEL): + scifunction = work[0] + args = work[1:] + result = calculator.__getattribute__(scifunction)(*args) + done_queue.put(result) + + except Exception as e: + e_str = traceback.format_exc(e) + done_queue.put({'error': e_str}) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/Matchup.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py new file mode 100644 index 0000000..8d90389 --- /dev/null +++ b/analysis/webservice/algorithms_spark/Matchup.py @@ -0,0 +1,691 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" + +import json +import logging +import threading +import time +from datetime import datetime +from itertools import chain +from math import cos, radians + +import numpy as np +import pyproj +import requests +from nexustiles.nexustiles import NexusTileService +from pytz import timezone, UTC +from scipy import spatial +from shapely import wkt +from shapely.geometry import Point +from shapely.geometry import box +from shapely.geos import ReadingError + +from webservice.NexusHandler import SparkHandler, nexus_handler +from webservice.algorithms.doms import config as edge_endpoints +from webservice.algorithms.doms import values as doms_values +from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults +from webservice.algorithms.doms.ResultsStorage import ResultsStorage +from webservice.webmodel import NexusProcessingException + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +def iso_time_to_epoch(str_time): + return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=UTC) - EPOCH).total_seconds() + + +@nexus_handler +class Matchup(SparkHandler): + name = "Matchup" + path = "/match_spark" + description = "Match measurements between two or more datasets" + + params = { + "primary": { + "name": "Primary Dataset", + "type": "string", + "description": "The Primary dataset used to find matches for. Required" + }, + "matchup": { + "name": "Match-Up Datasets", + "type": "comma-delimited string", + "description": "The Dataset(s) being searched for measurements that match the Primary. Required" + }, + "parameter": { + "name": "Match-Up Parameter", + "type": "string", + "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "b": { + "name": "Bounding box", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" + }, + "depthMin": { + "name": "Minimum Depth", + "type": "float", + "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit" + }, + "depthMax": { + "name": "Maximum Depth", + "type": "float", + "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit" + }, + "tt": { + "name": "Time Tolerance", + "type": "long", + "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400" + }, + "rt": { + "name": "Radius Tolerance", + "type": "float", + "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000" + }, + "platforms": { + "name": "Platforms", + "type": "comma-delimited integer", + "description": "Platforms to include for matchup consideration. Required" + }, + "matchOnce": { + "name": "Match Once", + "type": "boolean", + "description": "Optional True/False flag used to determine if more than one match per primary point is returned. " + + "If true, only the nearest point will be returned for each primary point. " + + "If false, all points within the tolerances will be returned for each primary point. Default: False" + }, + "resultSizeLimit": { + "name": "Result Size Limit", + "type": "int", + "description": "Optional integer value that limits the number of results returned from the matchup. " + "If the number of primary matches is greater than this limit, the service will respond with " + "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. " + "Default: 500" + } + } + singleton = True + + def __init__(self): + SparkHandler.__init__(self, skipCassandra=True) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + try: + bounding_polygon = request.get_bounding_polygon() + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + primary_ds_name = request.get_argument('primary', None) + if primary_ds_name is None: + raise NexusProcessingException(reason="'primary' argument is required", code=400) + matchup_ds_names = request.get_argument('matchup', None) + if matchup_ds_names is None: + raise NexusProcessingException(reason="'matchup' argument is required", code=400) + + parameter_s = request.get_argument('parameter', 'sst') + if parameter_s not in ['sst', 'sss', 'wind']: + raise NexusProcessingException( + reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) + + try: + start_time = request.get_start_datetime() + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + depth_min = request.get_decimal_arg('depthMin', default=None) + depth_max = request.get_decimal_arg('depthMax', default=None) + + if depth_min is not None and depth_max is not None and depth_min >= depth_max: + raise NexusProcessingException( + reason="Depth Min should be less than Depth Max", code=400) + + time_tolerance = request.get_int_arg('tt', default=86400) + radius_tolerance = request.get_decimal_arg('rt', default=1000.0) + platforms = request.get_argument('platforms', None) + if platforms is None: + raise NexusProcessingException(reason="'platforms' argument is required", code=400) + try: + p_validation = platforms.split(',') + p_validation = [int(p) for p in p_validation] + del p_validation + except: + raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) + + match_once = request.get_boolean_arg("matchOnce", default=False) + + result_size_limit = request.get_int_arg("resultSizeLimit", default=500) + + start_seconds_from_epoch = long((start_time - EPOCH).total_seconds()) + end_seconds_from_epoch = long((end_time - EPOCH).total_seconds()) + + return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \ + start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ + depth_min, depth_max, time_tolerance, radius_tolerance, \ + platforms, match_once, result_size_limit + + def calc(self, request, **args): + start = datetime.utcnow() + # TODO Assuming Satellite primary + bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \ + start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ + depth_min, depth_max, time_tolerance, radius_tolerance, \ + platforms, match_once, result_size_limit = self.parse_arguments(request) + + with ResultsStorage() as resultsStorage: + + execution_id = str(resultsStorage.insertExecution(None, start, None, None)) + + self.log.debug("Querying for tiles in search domain") + # Get tile ids in box + tile_ids = [tile.tile_id for tile in + self._tile_service.find_tiles_in_polygon(bounding_polygon, primary_ds_name, + start_seconds_from_epoch, end_seconds_from_epoch, + fetch_data=False, fl='id', + sort=['tile_min_time_dt asc', 'tile_min_lon asc', + 'tile_min_lat asc'], rows=5000)] + + # Call spark_matchup + self.log.debug("Calling Spark Driver") + try: + spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name, + matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance, + radius_tolerance, platforms, match_once, sc=self._sc) + except Exception as e: + self.log.exception(e) + raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500) + + end = datetime.utcnow() + + self.log.debug("Building and saving results") + args = { + "primary": primary_ds_name, + "matchup": matchup_ds_names, + "startTime": start_time, + "endTime": end_time, + "bbox": request.get_argument('b'), + "timeTolerance": time_tolerance, + "radiusTolerance": float(radius_tolerance), + "platforms": platforms, + "parameter": parameter_s + } + + if depth_min is not None: + args["depthMin"] = float(depth_min) + + if depth_max is not None: + args["depthMax"] = float(depth_max) + + total_keys = len(spark_result.keys()) + total_values = sum(len(v) for v in spark_result.itervalues()) + details = { + "timeToComplete": int((end - start).total_seconds()), + "numInSituRecords": 0, + "numInSituMatched": total_values, + "numGriddedChecked": 0, + "numGriddedMatched": total_keys + } + + matches = Matchup.convert_to_matches(spark_result) + + def do_result_insert(): + with ResultsStorage() as storage: + storage.insertResults(results=matches, params=args, stats=details, + startTime=start, completeTime=end, userEmail="", + execution_id=execution_id) + + threading.Thread(target=do_result_insert).start() + + if 0 < result_size_limit < len(matches): + result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None, + computeOptions=None, executionId=execution_id, status_code=202) + else: + result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None, + computeOptions=None, executionId=execution_id) + + return result + + @classmethod + def convert_to_matches(cls, spark_result): + matches = [] + for primary_domspoint, matched_domspoints in spark_result.iteritems(): + p_matched = [cls.domspoint_to_dict(p_match) for p_match in matched_domspoints] + + primary = cls.domspoint_to_dict(primary_domspoint) + primary['matches'] = list(p_matched) + matches.append(primary) + return matches + + @staticmethod + def domspoint_to_dict(domspoint): + return { + "sea_water_temperature": domspoint.sst, + "sea_water_temperature_depth": domspoint.sst_depth, + "sea_water_salinity": domspoint.sss, + "sea_water_salinity_depth": domspoint.sss_depth, + "wind_speed": domspoint.wind_speed, + "wind_direction": domspoint.wind_direction, + "wind_u": domspoint.wind_u, + "wind_v": domspoint.wind_v, + "platform": doms_values.getPlatformById(domspoint.platform), + "device": doms_values.getDeviceById(domspoint.device), + "x": str(domspoint.longitude), + "y": str(domspoint.latitude), + "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude), + "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC), + "fileurl": domspoint.file_url, + "id": domspoint.data_id, + "source": domspoint.source, + } + + +class DomsPoint(object): + def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None): + + self.time = time + self.longitude = longitude + self.latitude = latitude + self.depth = depth + self.data_id = data_id + + self.wind_u = None + self.wind_v = None + self.wind_direction = None + self.wind_speed = None + self.sst = None + self.sst_depth = None + self.sss = None + self.sss_depth = None + self.source = None + self.depth = None + self.platform = None + self.device = None + self.file_url = None + + def __repr__(self): + return str(self.__dict__) + + @staticmethod + def from_nexus_point(nexus_point, tile=None, parameter='sst'): + point = DomsPoint() + + point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index) + + # TODO Not an ideal solution; but it works for now. + if parameter == 'sst': + point.sst = nexus_point.data_val.item() + elif parameter == 'sss': + point.sss = nexus_point.data_val.item() + elif parameter == 'wind': + point.wind_u = nexus_point.data_val.item() + try: + point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item() + except (KeyError, IndexError): + pass + try: + point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item() + except (KeyError, IndexError): + pass + try: + point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item() + except (KeyError, IndexError): + pass + else: + raise NotImplementedError('%s not supported. Only sst, sss, and wind parameters are supported.' % parameter) + + point.longitude = nexus_point.longitude.item() + point.latitude = nexus_point.latitude.item() + + point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ') + + try: + point.depth = nexus_point.depth + except KeyError: + # No depth associated with this measurement + pass + + point.sst_depth = 0 + point.source = tile.dataset + point.file_url = tile.granule + + # TODO device should change based on the satellite making the observations. + point.platform = 9 + point.device = 5 + return point + + @staticmethod + def from_edge_point(edge_point): + point = DomsPoint() + + try: + x, y = wkt.loads(edge_point['point']).coords[0] + except ReadingError: + try: + x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0] + except ValueError: + y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0] + + point.longitude = x + point.latitude = y + + point.time = edge_point['time'] + + point.wind_u = edge_point.get('eastward_wind') + point.wind_v = edge_point.get('northward_wind') + point.wind_direction = edge_point.get('wind_direction') + point.wind_speed = edge_point.get('wind_speed') + point.sst = edge_point.get('sea_water_temperature') + point.sst_depth = edge_point.get('sea_water_temperature_depth') + point.sss = edge_point.get('sea_water_salinity') + point.sss_depth = edge_point.get('sea_water_salinity_depth') + point.source = edge_point.get('source') + point.platform = edge_point.get('platform') + point.device = edge_point.get('device') + point.file_url = edge_point.get('fileurl') + + try: + point.data_id = unicode(edge_point['id']) + except KeyError: + point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude) + + return point + + +from threading import Lock + +DRIVER_LOCK = Lock() + + +def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max, + time_tolerance, radius_tolerance, platforms, match_once, sc=None): + from functools import partial + + with DRIVER_LOCK: + # Broadcast parameters + primary_b = sc.broadcast(primary_ds_name) + matchup_b = sc.broadcast(matchup_ds_names) + depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None) + depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None) + tt_b = sc.broadcast(time_tolerance) + rt_b = sc.broadcast(float(radius_tolerance)) + platforms_b = sc.broadcast(platforms) + bounding_wkt_b = sc.broadcast(bounding_wkt) + parameter_b = sc.broadcast(parameter) + + # Parallelize list of tile ids + rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids))) + + # Map Partitions ( list(tile_id) ) + rdd_filtered = rdd.mapPartitions( + partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b, + rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b, + depth_max_b=depth_max_b), preservesPartitioning=True) \ + .filter(lambda p_m_tuple: abs( + iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance) + + if match_once: + # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates + # the distance and finds the minimum + + # Method used for calculating the distance between 2 DomsPoints + from pyproj import Geod + + def dist(primary, matchup): + wgs84_geod = Geod(ellps='WGS84') + lat1, lon1 = (primary.latitude, primary.longitude) + lat2, lon2 = (matchup.latitude, matchup.longitude) + az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2) + return distance + + rdd_filtered = rdd_filtered \ + .map(lambda (primary, matchup): tuple([primary, tuple([matchup, dist(primary, matchup)])])) \ + .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \ + .mapValues(lambda x: [x[0]]) + else: + rdd_filtered = rdd_filtered \ + .combineByKey(lambda value: [value], # Create 1 element list + lambda value_list, value: value_list + [value], # Add 1 element to list + lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together + + result_as_map = rdd_filtered.collectAsMap() + + return result_as_map + + +def determine_parllelism(num_tiles): + """ + Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions. + Also, don't go below the default of 8 + """ + num_partitions = max(min(num_tiles / 140, 128), 8) + return num_partitions + + +def add_meters_to_lon_lat(lon, lat, meters): + """ + Uses a simple approximation of + 1 degree latitude = 111,111 meters + 1 degree longitude = 111,111 meters * cosine(latitude) + :param lon: longitude to add meters to + :param lat: latitude to add meters to + :param meters: meters to add to the longitude and latitude values + :return: (longitude, latitude) increased by given meters + """ + longitude = lon + ((meters / 111111) * cos(radians(lat))) + latitude = lat + (meters / 111111) + + return longitude, latitude + + +def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b, + bounding_wkt_b, depth_min_b, depth_max_b): + the_time = datetime.now() + tile_ids = list(tile_ids) + if len(tile_ids) == 0: + return [] + tile_service = NexusTileService() + + # Determine the spatial temporal extents of this partition of tiles + tiles_bbox = tile_service.get_bounding_box(tile_ids) + tiles_min_time = tile_service.get_min_time(tile_ids) + tiles_max_time = tile_service.get_max_time(tile_ids) + + # Increase spatial extents by the radius tolerance + matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1], + -1 * rt_b.value) + matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value) + + # Don't go outside of the search domain + search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds + matchup_min_lon = max(matchup_min_lon, search_min_x) + matchup_min_lat = max(matchup_min_lat, search_min_y) + matchup_max_lon = min(matchup_max_lon, search_max_x) + matchup_max_lat = min(matchup_max_lat, search_max_y) + + # Find the centroid of the matchup bounding box and initialize the projections + matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0] + aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1]) + lonlat_proj = pyproj.Proj(proj='lonlat') + + # Increase temporal extents by the time tolerance + matchup_min_time = tiles_min_time - tt_b.value + matchup_max_time = tiles_max_time + tt_b.value + print "%s Time to determine spatial-temporal extents for partition %s to %s" % ( + str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) + + # Query edge for all points within the spatial-temporal extents of this partition + the_time = datetime.now() + edge_session = requests.Session() + edge_results = [] + with edge_session: + for insitudata_name in matchup_b.value.split(','): + bbox = ','.join( + [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)]) + edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox, + platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session) + if edge_response['totalResults'] == 0: + continue + r = edge_response['results'] + for p in r: + p['source'] = insitudata_name + edge_results.extend(r) + print "%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) + if len(edge_results) == 0: + return [] + + # Convert edge points to utm + the_time = datetime.now() + matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32) + for n, edge_point in enumerate(edge_results): + try: + x, y = wkt.loads(edge_point['point']).coords[0] + except ReadingError: + try: + x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0] + except ValueError: + y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0] + + matchup_points[n][0], matchup_points[n][1] = pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=x, y=y) + print "%s Time to convert match points for partition %s to %s" % ( + str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) + + # Build kdtree from matchup points + the_time = datetime.now() + m_tree = spatial.cKDTree(matchup_points, leafsize=30) + print "%s Time to build matchup tree" % (str(datetime.now() - the_time)) + + # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time + match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value, + parameter_b.value, rt_b.value, lonlat_proj, aeqd_proj) for tile_id + in tile_ids] + + return chain(*match_generators) + + +def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt, + search_parameter, radius_tolerance, lonlat_proj, aeqd_proj): + from nexustiles.model.nexusmodel import NexusPoint + from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains + + # Load tile + try: + the_time = datetime.now() + tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt), + tile_service.find_tile_by_id(tile_id))[0] + print "%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id) + except IndexError: + # This should only happen if all measurements in a tile become masked after applying the bounding polygon + raise StopIteration + + # Convert valid tile lat,lon tuples to UTM tuples + the_time = datetime.now() + # Get list of indices of valid values + valid_indices = tile.get_indices() + primary_points = np.array( + [pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=tile.longitudes[aslice[2]], y=tile.latitudes[aslice[1]]) for + aslice in valid_indices]) + + print "%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id) + + a_time = datetime.now() + p_tree = spatial.cKDTree(primary_points, leafsize=30) + print "%s Time to build primary tree" % (str(datetime.now() - a_time)) + + a_time = datetime.now() + matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance) + print "%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id) + for i, point_matches in enumerate(matched_indexes): + if len(point_matches) > 0: + p_nexus_point = NexusPoint(tile.latitudes[valid_indices[i][1]], + tile.longitudes[valid_indices[i][2]], None, + tile.times[valid_indices[i][0]], valid_indices[i], + tile.data[tuple(valid_indices[i])]) + p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile, parameter=search_parameter) + for m_point_index in point_matches: + m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index]) + yield p_doms_point, m_doms_point + + +def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000, + startIndex=0, stats=True, session=None): + try: + startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + platform = platform.split(',') + except AttributeError: + # Assume we were passed a list + pass + + params = {"variable": variable, + "startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "minDepth": depth_min, + "maxDepth": depth_max, + "platform": platform, + "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()} + + if session is not None: + edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) + else: + edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) + + edge_request.raise_for_status() + edge_response = json.loads(edge_request.text) + + # Get all edge results + next_page_url = edge_response.get('next', None) + while next_page_url is not None: + if session is not None: + edge_page_request = session.get(next_page_url) + else: + edge_page_request = requests.get(next_page_url) + + edge_page_request.raise_for_status() + edge_page_response = json.loads(edge_page_request.text) + + edge_response['results'].extend(edge_page_response['results']) + + next_page_url = edge_page_response.get('next', None) + + return edge_response http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py new file mode 100644 index 0000000..e332654 --- /dev/null +++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py @@ -0,0 +1,248 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import logging + +import numpy as np +from nexustiles.nexustiles import NexusTileService + +# from time import time +from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException + + +@nexus_handler +class TimeAvgMapSparkHandlerImpl(SparkHandler): + name = "Time Average Map Spark" + path = "/timeAvgMapSpark" + description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range" + params = DEFAULT_PARAMETERS_SPEC + singleton = True + + def __init__(self): + SparkHandler.__init__(self) + self.log = logging.getLogger(__name__) + # self.log.setLevel(logging.DEBUG) + + @staticmethod + def _map(tile_in_spark): + tile_bounds = tile_in_spark[0] + (min_lat, max_lat, min_lon, max_lon, + min_y, max_y, min_x, max_x) = tile_bounds + startTime = tile_in_spark[1] + endTime = tile_in_spark[2] + ds = tile_in_spark[3] + tile_service = NexusTileService() + # print 'Started tile {0}'.format(tile_bounds) + # sys.stdout.flush() + tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) + # days_at_a_time = 90 + days_at_a_time = 30 + # days_at_a_time = 7 + # days_at_a_time = 1 + # print 'days_at_a_time = {0}'.format(days_at_a_time) + t_incr = 86400 * days_at_a_time + sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64)) + cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32)) + t_start = startTime + while t_start <= endTime: + t_end = min(t_start + t_incr, endTime) + # t1 = time() + # print 'nexus call start at time {0}'.format(t1) + # sys.stdout.flush() + # nexus_tiles = \ + # TimeAvgMapSparkHandlerImpl.query_by_parts(tile_service, + # min_lat, max_lat, + # min_lon, max_lon, + # ds, + # t_start, + # t_end, + # part_dim=2) + nexus_tiles = \ + tile_service.get_tiles_bounded_by_box(min_lat, max_lat, + min_lon, max_lon, + ds=ds, + start_time=t_start, + end_time=t_end) + # t2 = time() + # print 'nexus call end at time %f' % t2 + # print 'secs in nexus call: ', t2 - t1 + # print 't %d to %d - Got %d tiles' % (t_start, t_end, + # len(nexus_tiles)) + # for nt in nexus_tiles: + # print nt.granule + # print nt.section_spec + # print 'lat min/max:', np.ma.min(nt.latitudes), np.ma.max(nt.latitudes) + # print 'lon min/max:', np.ma.min(nt.longitudes), np.ma.max(nt.longitudes) + # sys.stdout.flush() + + for tile in nexus_tiles: + tile.data.data[:, :] = np.nan_to_num(tile.data.data) + sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1] + cnt_tile += (~tile.data.mask[0, + min_y:max_y + 1, + min_x:max_x + 1]).astype(np.uint8) + t_start = t_end + 1 + + # print 'cnt_tile = ', cnt_tile + # cnt_tile.mask = ~(cnt_tile.data.astype(bool)) + # sum_tile.mask = cnt_tile.mask + # avg_tile = sum_tile / cnt_tile + # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in range(tile_inbounds_shape[0])] + # print 'Finished tile', tile_bounds + # print 'Tile avg = ', avg_tile + # sys.stdout.flush() + return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile)) + + def calc(self, computeOptions, **args): + """ + + :param computeOptions: StatsComputeOptions + :param args: dict + :return: + """ + + spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg() + self._setQueryParams(computeOptions.get_dataset()[0], + (float(computeOptions.get_min_lat()), + float(computeOptions.get_max_lat()), + float(computeOptions.get_min_lon()), + float(computeOptions.get_max_lon())), + computeOptions.get_start_time(), + computeOptions.get_end_time(), + spark_master=spark_master, + spark_nexecs=spark_nexecs, + spark_nparts=spark_nparts) + + if 'CLIM' in self._ds: + raise NexusProcessingException( + reason="Cannot compute Latitude/Longitude Time Average plot on a climatology", code=400) + + nexus_tiles = self._find_global_tile_set() + # print 'tiles:' + # for tile in nexus_tiles: + # print tile.granule + # print tile.section_spec + # print 'lat:', tile.latitudes + # print 'lon:', tile.longitudes + + # nexus_tiles) + if len(nexus_tiles) == 0: + raise NoDataException(reason="No data found for selected timeframe") + + self.log.debug('Found {0} tiles'.format(len(nexus_tiles))) + + self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes)) + nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1 + nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1 + self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons)) + self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent, + self._maxLatCent)) + self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent, + self._maxLonCent)) + + # for tile in nexus_tiles: + # print 'lats: ', tile.latitudes.compressed() + # print 'lons: ', tile.longitudes.compressed() + # Create array of tuples to pass to Spark map function + nexus_tiles_spark = [[self._find_tile_bounds(t), + self._startTime, self._endTime, + self._ds] for t in nexus_tiles] + # print 'nexus_tiles_spark = ', nexus_tiles_spark + # Remove empty tiles (should have bounds set to None) + bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0] + for i in np.flipud(bad_tile_inds): + del nexus_tiles_spark[i] + + # Expand Spark map tuple array by duplicating each entry N times, + # where N is the number of ways we want the time dimension carved up. + num_time_parts = 72 + # num_time_parts = 1 + nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0) + self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark))) + + # Set the time boundaries for each of the Spark map tuples. + # Every Nth element in the array gets the same time bounds. + spark_part_times = np.linspace(self._startTime, self._endTime, + num_time_parts + 1, dtype=np.int64) + + spark_part_time_ranges = \ + np.repeat([[[spark_part_times[i], + spark_part_times[i + 1]] for i in range(num_time_parts)]], + len(nexus_tiles_spark) / num_time_parts, axis=0).reshape((len(nexus_tiles_spark), 2)) + self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges)) + nexus_tiles_spark[:, 1:3] = spark_part_time_ranges + # print 'nexus_tiles_spark final = ' + # for i in range(len(nexus_tiles_spark)): + # print nexus_tiles_spark[i] + + # Launch Spark computations + rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts) + sum_count_part = rdd.map(self._map) + sum_count = \ + sum_count_part.combineByKey(lambda val: val, + lambda x, val: (x[0] + val[0], + x[1] + val[1]), + lambda x, y: (x[0] + y[0], x[1] + y[1])) + fill = self._fill + avg_tiles = \ + sum_count.map(lambda (bounds, (sum_tile, cnt_tile)): + (bounds, [[{'avg': (sum_tile[y, x] / cnt_tile[y, x]) + if (cnt_tile[y, x] > 0) + else fill, + 'cnt': cnt_tile[y, x]} + for x in + range(sum_tile.shape[1])] + for y in + range(sum_tile.shape[0])])).collect() + + # Combine subset results to produce global map. + # + # The tiles below are NOT Nexus objects. They are tuples + # with the time avg map data and lat-lon bounding box. + a = np.zeros((nlats, nlons), dtype=np.float64, order='C') + n = np.zeros((nlats, nlons), dtype=np.uint32, order='C') + for tile in avg_tiles: + if tile is not None: + ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon), + tile_stats) = tile + tile_data = np.ma.array( + [[tile_stats[y][x]['avg'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))]) + tile_cnt = np.array( + [[tile_stats[y][x]['cnt'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))]) + tile_data.mask = ~(tile_cnt.astype(bool)) + y0 = self._lat2ind(tile_min_lat) + y1 = y0 + tile_data.shape[0] - 1 + x0 = self._lon2ind(tile_min_lon) + x1 = x0 + tile_data.shape[1] - 1 + if np.any(np.logical_not(tile_data.mask)): + self.log.debug( + 'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat, + tile_max_lat, + tile_min_lon, + tile_max_lon, y0, + y1, x0, x1)) + a[y0:y1 + 1, x0:x1 + 1] = tile_data + n[y0:y1 + 1, x0:x1 + 1] = tile_cnt + else: + self.log.debug( + 'All pixels masked in tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format( + tile_min_lat, tile_max_lat, + tile_min_lon, tile_max_lon, + y0, y1, x0, x1)) + + # Store global map in a NetCDF file. + self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill) + + # Create dict for JSON response + results = [[{'avg': a[y, x], 'cnt': int(n[y, x]), + 'lat': self._ind2lat(y), 'lon': self._ind2lon(x)} + for x in range(a.shape[1])] for y in range(a.shape[0])] + + return TimeAvgMapSparkResults(results=results, meta={}, computeOptions=computeOptions) + + +class TimeAvgMapSparkResults(NexusResults): + def __init__(self, results=None, meta=None, computeOptions=None): + NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)
