This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch SDAP-371 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 8611706e633f098b113e33b80e4b008155eb1555 Author: skorper <[email protected]> AuthorDate: Fri Jun 10 17:36:31 2022 -0700 Fixed DOMS subsetter for satellite data --- analysis/webservice/algorithms/doms/subsetter.py | 104 ++++++++++++++++++++--- data-access/nexustiles/model/nexusmodel.py | 20 +++-- data-access/nexustiles/nexustiles.py | 12 ++- 3 files changed, 119 insertions(+), 17 deletions(-) diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py index 4c1ff97..ed1f552 100644 --- a/analysis/webservice/algorithms/doms/subsetter.py +++ b/analysis/webservice/algorithms/doms/subsetter.py @@ -15,17 +15,20 @@ import logging import os +import io import tempfile import zipfile +from pytz import timezone from datetime import datetime import requests from . import BaseDomsHandler from webservice.NexusHandler import nexus_handler -from webservice.webmodel import NexusProcessingException +from webservice.webmodel import NexusProcessingException, NexusResults ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) def is_blank(my_string): @@ -121,14 +124,14 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): try: start_time = request.get_start_datetime() - start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + start_time = int((start_time - EPOCH).total_seconds()) except: raise NexusProcessingException( reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", code=400) try: end_time = request.get_end_datetime() - end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + end_time = int((end_time - EPOCH).total_seconds()) except: raise NexusProcessingException( reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", @@ -167,6 +170,55 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): bounding_polygon, depth_min, depth_max, platforms def calc(self, request, **args): + primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \ + bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request) + + min_lat = max_lat = min_lon = max_lon = None + if bounding_polygon: + min_lat = bounding_polygon.bounds[1] + max_lat = bounding_polygon.bounds[3] + min_lon = bounding_polygon.bounds[0] + max_lon = bounding_polygon.bounds[2] + + tiles = self._get_tile_service().get_tiles_bounded_by_box(min_lat, max_lat, min_lon, + max_lon, primary_ds_name, start_time, + end_time) + else: + tiles = [] # todo + # tiles = self._get_tile_service().get_tiles_by_metadata(metadata_filter, ds, start_time, + # end_time) + + data = [] + for tile in tiles: + for nexus_point in tile.nexus_point_generator(): + if tile.is_multi: + data_points = { + tile.variables[idx].standard_name: nexus_point.data_vals[idx] + for idx in range(len(tile.variables)) + } + else: + data_points = {tile.variables[0].standard_name: nexus_point.data_vals} + data.append({ + 'latitude': nexus_point.latitude, + 'longitude': nexus_point.longitude, + 'time': nexus_point.time, + 'data': data_points + }) + if len(tiles) > 0: + meta = [tile.get_summary() for tile in tiles] + else: + meta = None + + result = SubsetResult( + results=data, + meta=meta + ) + + result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time) + + return result + + def calc2(self, request, **args): primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \ bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request) @@ -235,18 +287,50 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): return SubsetResult(zip_path) -class SubsetResult(object): - def __init__(self, zip_path): - self.zip_path = zip_path - +class SubsetResult(NexusResults): def toJson(self): raise NotImplementedError + def toCsv(self): + """ + Convert results to CSV + """ + rows = [] + + headers = [ + 'longitude', + 'latitude', + 'time' + ] + + results = self.results() + + data_variables = set([keys for result in results for keys in result['data'].keys()]) + headers.extend(data_variables) + for i, result in enumerate(results): + cols = [] + + cols.append(result['longitude']) + cols.append(result['latitude']) + cols.append(datetime.utcfromtimestamp(result['time']).strftime('%Y-%m-%dT%H:%M:%SZ')) + + for var in data_variables: + cols.append(result['data'][var]) + if i == 0: + rows.append(','.join(headers)) + rows.append(','.join(map(str, cols))) + + return "\r\n".join(rows) + def toZip(self): - with open(self.zip_path, 'rb') as zip_file: - zip_contents = zip_file.read() + csv_contents = self.toCsv() + + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, 'a', zipfile.ZIP_DEFLATED) as zip_file: + zip_file.writestr('result.csv', csv_contents) - return zip_contents + buffer.seek(0) + return buffer.read() def cleanup(self): os.remove(self.zip_path) diff --git a/data-access/nexustiles/model/nexusmodel.py b/data-access/nexustiles/model/nexusmodel.py index 753d264..f5c9df6 100644 --- a/data-access/nexustiles/model/nexusmodel.py +++ b/data-access/nexustiles/model/nexusmodel.py @@ -126,22 +126,30 @@ class Tile(object): return summary def nexus_point_generator(self, include_nan=False): + indices = self.get_indices(include_nan) + if include_nan: - for index in np.ndindex(self.data.shape): + for index in indices: time = self.times[index[0]] lat = self.latitudes[index[1]] lon = self.longitudes[index[2]] - data_val = self.data[index] - point = NexusPoint(lat, lon, None, time, index, data_val) + if self.is_multi: + data_vals = [data[index] for data in self.data] + else: + data_vals = self.data[index] + point = NexusPoint(lat, lon, None, time, index, data_vals) yield point else: - for index in np.transpose(np.ma.nonzero(self.data)): + for index in indices: index = tuple(index) time = self.times[index[0]] lat = self.latitudes[index[1]] lon = self.longitudes[index[2]] - data_val = self.data[index] - point = NexusPoint(lat, lon, None, time, index, data_val) + if self.is_multi: + data_vals = [data[index] for data in self.data] + else: + data_vals = self.data[index] + point = NexusPoint(lat, lon, None, time, index, data_vals) yield point def get_indices(self, include_nan=False): diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 7483c2b..88a1687 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -260,6 +260,7 @@ class NexusTileService(object): def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): tiles = self.find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs) + tiles = tiles[:1] # TODO REMOVE ME!!! tiles = self.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, tiles) if 0 <= start_time <= end_time: tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles) @@ -423,7 +424,16 @@ class NexusTileService(object): | ma.getmaskarray(tile.latitudes)[np.newaxis, :, np.newaxis] \ | ma.getmaskarray(tile.longitudes)[np.newaxis, np.newaxis, :] - tile.data = ma.masked_where(data_mask, tile.data) + # If this is multi-var, need to mask each variable separately. + if tile.is_multi: + # Combine space/time mask with existing mask on data + data_mask = reduce(np.logical_or, [tile.data[0].mask, data_mask]) + + num_vars = len(tile.data) + multi_data_mask = np.repeat(data_mask[np.newaxis, ...], num_vars, axis=0) + tile.data = ma.masked_where(multi_data_mask, tile.data) + else: + tile.data = ma.masked_where(data_mask, tile.data) tiles[:] = [tile for tile in tiles if not tile.data.mask.all()]
