This is an automated email from the ASF dual-hosted git repository. rkk pushed a commit to branch tmp-stv in repository https://gitbox.apache.org/repos/asf/sdap-nexus.git
commit 7395f11a08c0b89c581e3b0eeee82165e85f01ab Author: rileykk <[email protected]> AuthorDate: Thu Sep 28 13:15:55 2023 -0700 Lon + Lat tomo fetch impls --- analysis/webservice/algorithms/Tomogram.py | 472 +++++++++++++++++++++++++---- 1 file changed, 411 insertions(+), 61 deletions(-) diff --git a/analysis/webservice/algorithms/Tomogram.py b/analysis/webservice/algorithms/Tomogram.py index 0507e42..d48d94d 100644 --- a/analysis/webservice/algorithms/Tomogram.py +++ b/analysis/webservice/algorithms/Tomogram.py @@ -15,7 +15,7 @@ import logging from io import BytesIO -from typing import Dict, Literal, Union +from typing import Dict, Literal, Union, Tuple import matplotlib.pyplot as plt import numpy as np @@ -34,16 +34,6 @@ class TomogramBaseClass(NexusCalcHandler): **kwargs ): NexusCalcHandler.__init__(self, tile_service_factory) - self.__slice_bounds = None - self.__margin = None - - def _set_params( - self, - slice_bounds: Dict[Literal['lat', 'lon', 'elevation'], Union[slice, float]], - margin: float - ): - self.__slice_bounds = slice_bounds - self.__margin = margin def parse_args(self, compute_options): try: @@ -55,31 +45,31 @@ class TomogramBaseClass(NexusCalcHandler): return ds, parameter_s - def do_subset(self, ds, parameter): + def do_subset(self, ds, parameter, bounds, margin): tile_service = self._get_tile_service() - bounds = self.__slice_bounds + # bounds = self.__slice_bounds if isinstance(bounds['lat'], slice): min_lat = bounds['lat'].start max_lat = bounds['lat'].stop else: - min_lat = bounds['lat'] - self.__margin - max_lat = bounds['lat'] + self.__margin + min_lat = bounds['lat'] - margin + max_lat = bounds['lat'] + margin if isinstance(bounds['lon'], slice): min_lon = bounds['lon'].start max_lon = bounds['lon'].stop else: - min_lon = bounds['lon'] - self.__margin - max_lon = bounds['lon'] + self.__margin + min_lon = bounds['lon'] - margin + max_lon = bounds['lon'] + margin if isinstance(bounds['elevation'], slice): min_elevation = bounds['elevation'].start max_elevation = bounds['elevation'].stop else: - min_elevation = bounds['elevation'] - self.__margin - max_elevation = bounds['elevation'] + self.__margin + min_elevation = bounds['elevation'] - margin + max_elevation = bounds['elevation'] + margin tiles = tile_service.find_tiles_in_box( min_lat, max_lat, min_lon, max_lon, @@ -89,7 +79,7 @@ class TomogramBaseClass(NexusCalcHandler): fetch_data=False ) - logger.info(f'Fetched {len(tiles):,}') + logger.info(f'Matched {len(tiles):,} tiles from Solr') data = [] @@ -132,26 +122,39 @@ class TomogramBaseClass(NexusCalcHandler): 'data': data_val }) - logger.info('Fetched data, organizing it for plotting') - return data + @staticmethod + def data_subset_to_ds(data_in_bounds: list) -> Tuple[np.ndarray, np.ndarray, np.ndarray, xr.Dataset]: + lats = np.unique([d['latitude'] for d in data_in_bounds]) + lons = np.unique([d['longitude'] for d in data_in_bounds]) -# # @nexus_handler -# class LatitudeTomogramImpl(TomogramBaseClass): -# pass -# -# -# # @nexus_handler -# class LongitudeTomogramImpl(TomogramBaseClass): -# pass + vals = np.empty((len(lats), len(lons))) + + data_dict = {(d['latitude'], d['longitude']): d['data'] for d in data_in_bounds} + + for i, lat in enumerate(lats): + for j, lon in enumerate(lons): + vals[i, j] = data_dict.get((lat, lon), np.nan) + + ds = xr.Dataset( + data_vars=dict( + tomo=(('latitude', 'longitude'), vals) + ), + coords=dict( + latitude=(['latitude'], lats), + longitude=(['longitude'], lons) + ) + ) + + return lats, lons, vals, ds @nexus_handler class ElevationTomogramImpl(TomogramBaseClass): name = "Elevation-sliced Tomogram" path = "/tomogram/elevation" - description = "Fetches point values for a given dataset and geographical area" + description = "Fetches a tomogram sliced by elevation" params = { "ds": { "name": "Dataset", @@ -202,8 +205,8 @@ class ElevationTomogramImpl(TomogramBaseClass): return ds, parameter, bounding_poly, elevation, margin - def calc(self, computeOptions, **args): - ds, parameter, bounding_poly, elevation, margin = self.parse_args(computeOptions) + def calc(self, compute_options, **args): + dataset, parameter, bounding_poly, elevation, margin = self.parse_args(compute_options) min_lat = bounding_poly.bounds[1] max_lat = bounding_poly.bounds[3] @@ -216,40 +219,351 @@ class ElevationTomogramImpl(TomogramBaseClass): elevation=float(elevation) ) - self._set_params(slices, margin) - data_in_bounds = self.do_subset(ds, parameter) + data_in_bounds = self.do_subset(dataset, parameter, slices, margin) + logger.info(f'Fetched {len(data_in_bounds):,} data points, organizing them for plotting') - lats = np.unique([d['latitude'] for d in data_in_bounds]) - lons = np.unique([d['longitude'] for d in data_in_bounds]) + lats, lons, vals, ds = TomogramBaseClass.data_subset_to_ds(data_in_bounds) - vals = np.empty((len(lats), len(lons))) + result = ElevationTomoResults( + (vals, lats, lons, dict(ds=dataset, elevation=elevation, margin=margin)), + ) - data_dict = {(d['latitude'], d['longitude']): d['data'] for d in data_in_bounds} + return result - for i, lat in enumerate(lats): - for j, lon in enumerate(lons): - vals[i, j] = data_dict.get((lat, lon), np.nan) - ds = xr.Dataset( - data_vars=dict( - tomo=(('latitude', 'longitude'), vals) - ), - coords=dict( - latitude=(['latitude'], lats), - longitude=(['longitude'], lons) - ), - attrs=dict( - ds=ds, - elevation=elevation, - margin=margin - ) +@nexus_handler +class LongitudeTomogramImpl(TomogramBaseClass): + name = "Tomogram Latitude profile view" + path = "/tomogram/longitude" + description = "Fetches a tomogram sliced by longitude" + params = { + "ds": { + "name": "Dataset", + "type": "string", + "description": "The Dataset shortname to use in calculation. Required" + }, + "parameter": { + "name": "Parameter", + "type": "string", + "description": "The parameter of interest." + }, + "longitude": { + "name": "Longitude", + "type": "float", + "description": "Longitude at which to slice the tomogram. Required." + }, + "minLat": { + "name": "Minimum Latitude", + "type": "float", + "description": "Minimum (Southern) Latitude. Required." + }, + "maxLat": { + "name": "Maximum Latitude", + "type": "float", + "description": "Maximum (Northern) Latitude. Required." + }, + "minElevation": { + "name": "Minimum Elevation", + "type": "int", + "description": "Minimum Elevation. Required." + }, + "maxElevation": { + "name": "Maximum Elevation", + "type": "int", + "description": "Maximum Elevation. Required." + }, + "stride": { + "name": "Stride", + "type": "integer", + "description": "Stride between elevation slices. Default: 1" + }, + "elevationMargin": { + "name": "Elevation Margin", + "type": "float", + "description": "Margin +/- desired elevation to include in output. Default: 0.5m" + }, + "horizontalMargin": { + "name": "Horizontal Margin", + "type": "float", + "description": "Margin +/- desired lat/lon slice to include in output. Default: 0.001m" + }, + } + singleton = True + + def __init__(self, tile_service_factory, **kwargs): + TomogramBaseClass.__init__(self, tile_service_factory, **kwargs) + + def parse_args(self, compute_options): + ds, parameter = super().parse_args(compute_options) + + def get_required_float(name): + val = compute_options.get_float_arg(name) + + if val is None: + raise NexusProcessingException(reason=f'Missing required parameter: {name}', code=400) + + return val + + def get_required_int(name): + val = compute_options.get_int_arg(name) + + if val is None: + raise NexusProcessingException(reason=f'Missing required parameter: {name}', code=400) + + return val + + longitude = get_required_float('longitude') + min_lat = get_required_float('minLat') + max_lat = get_required_float('maxLat') + min_elevation = get_required_int('minElevation') + max_elevation = get_required_int('maxElevation') + + elevation_margin = compute_options.get_float_arg('elevationMargin', 0.5) + horizontal_margin = compute_options.get_float_arg('horizontalMargin', 0.001) + stride = compute_options.get_int_arg('stride', 1) + + return (ds, parameter, longitude, min_lat, max_lat, min_elevation, max_elevation, elevation_margin, + horizontal_margin, stride) + + def calc(self, compute_options, **args): + (dataset, parameter, longitude, min_lat, max_lat, min_elevation, max_elevation, elevation_margin, + horizontal_margin, stride) = self.parse_args(compute_options) + + slices = dict( + lat=slice(min_lat, max_lat), + lon=slice(longitude - horizontal_margin, longitude + horizontal_margin), ) - result = ElevationTomoResults( - (vals, lats, lons, dict(ds=ds, elevation=elevation, margin=margin)), + slice_dict = {} + lats = [] + n_points = 0 + r = range(min_elevation, max_elevation, stride) + + logger.info(f'Fetching {len(r):,} elevation slices') + + for e in r: + logger.info(f'Fetching elevation: {e}') + + slices['elevation'] = e + + data_in_bounds = self.do_subset(dataset, parameter, slices, elevation_margin) + logger.info(f'Fetched {len(data_in_bounds):,} data points at this elevation') + n_points += len(data_in_bounds) + + ds = TomogramBaseClass.data_subset_to_ds(data_in_bounds)[3] + + if ds.tomo.shape == (0, 0): + continue + + slice_dict[e] = ds + lats.extend(ds.latitude) + + logger.info('Data fetch complete, organizing data to rows') + + lats = np.unique(lats) + n_lats = len(lats) + rows = [] + + for e in r: + row = np.full((n_lats,), np.nan) + + if e not in slice_dict: + rows.append(row) + continue + + ds = slice_dict[e] + + try: + ds = ds.sel(longitude=longitude) + + for lat, vox in zip(ds.latitude, ds.tomo): + lat, vox = lat.item(), vox.item() + + lat_i = np.where(lats == lat)[0][0] + + row[lat_i] = vox + except: + rows.append(row) + continue + + rows.append(row) + + rows = 10*np.log10(np.array(rows)) + + return ProfileTomoResults( + results=rows, + s={'longitude': longitude}, + extent=[lats[0], lats[-1], min_elevation, max_elevation], + meta=dict(dataset=dataset) + ) + + +@nexus_handler +class LatitudeTomogramImpl(TomogramBaseClass): + name = "Tomogram Latitude profile view" + path = "/tomogram/latitude" + description = "Fetches a tomogram sliced by latitude" + params = { + "ds": { + "name": "Dataset", + "type": "string", + "description": "The Dataset shortname to use in calculation. Required" + }, + "parameter": { + "name": "Parameter", + "type": "string", + "description": "The parameter of interest." + }, + "latitude": { + "name": "Latitude", + "type": "float", + "description": "Latitude at which to slice the tomogram. Required." + }, + "minLon": { + "name": "Minimum Longitude", + "type": "float", + "description": "Minimum (Eastern) Longitude. Required." + }, + "maxLon": { + "name": "Maximum Longitude", + "type": "float", + "description": "Maximum (Western) Longitude. Required." + }, + "minElevation": { + "name": "Minimum Elevation", + "type": "int", + "description": "Minimum Elevation. Required." + }, + "maxElevation": { + "name": "Maximum Elevation", + "type": "int", + "description": "Maximum Elevation. Required." + }, + "stride": { + "name": "Stride", + "type": "integer", + "description": "Stride between elevation slices. Default: 1" + }, + "elevationMargin": { + "name": "Elevation Margin", + "type": "float", + "description": "Margin +/- desired elevation to include in output. Default: 0.5m" + }, + "horizontalMargin": { + "name": "Horizontal Margin", + "type": "float", + "description": "Margin +/- desired lat/lon slice to include in output. Default: 0.001m" + }, + } + singleton = True + + def __init__(self, tile_service_factory, **kwargs): + TomogramBaseClass.__init__(self, tile_service_factory, **kwargs) + + def parse_args(self, compute_options): + ds, parameter = super().parse_args(compute_options) + + def get_required_float(name): + val = compute_options.get_float_arg(name) + + if val is None: + raise NexusProcessingException(reason=f'Missing required parameter: {name}', code=400) + + return val + + def get_required_int(name): + val = compute_options.get_int_arg(name) + + if val is None: + raise NexusProcessingException(reason=f'Missing required parameter: {name}', code=400) + + return val + + latitude = get_required_float('latitude') + min_lon = get_required_float('minLon') + max_lon = get_required_float('maxLon') + min_elevation = get_required_int('minElevation') + max_elevation = get_required_int('maxElevation') + + elevation_margin = compute_options.get_float_arg('elevationMargin', 0.5) + horizontal_margin = compute_options.get_float_arg('horizontalMargin', 0.001) + stride = compute_options.get_int_arg('stride', 1) + + return (ds, parameter, latitude, min_lon, max_lon, min_elevation, max_elevation, elevation_margin, + horizontal_margin, stride) + + def calc(self, compute_options, **args): + (dataset, parameter, latitude, min_lon, max_lon, min_elevation, max_elevation, elevation_margin, + horizontal_margin, stride) = self.parse_args(compute_options) + + slices = dict( + lon=slice(min_lon, max_lon), + lat=slice(latitude - horizontal_margin, latitude + horizontal_margin), + ) + + slice_dict = {} + lons = [] + n_points = 0 + r = range(min_elevation, max_elevation, stride) + + logger.info(f'Fetching {len(r):,} elevation slices') + + for e in r: + logger.info(f'Fetching elevation: {e}') + + slices['elevation'] = e + + data_in_bounds = self.do_subset(dataset, parameter, slices, elevation_margin) + logger.info(f'Fetched {len(data_in_bounds):,} data points at this elevation') + n_points += len(data_in_bounds) + + ds = TomogramBaseClass.data_subset_to_ds(data_in_bounds)[3] + + if ds.tomo.shape == (0, 0): + continue + + slice_dict[e] = ds + lons.extend(ds.latitude) + + logger.info('Data fetch complete, organizing data to rows') + + lons = np.unique(lons) + n_lons = len(lons) + rows = [] + + for e in r: + row = np.full((n_lons,), np.nan) + + if e not in slice_dict: + rows.append(row) + continue + + ds = slice_dict[e] + + try: + ds = ds.sel(latitude=latitude) + + for lon, vox in zip(ds.longitude, ds.tomo): + lon, vox = lon.item(), vox.item() + + lon_i = np.where(lons == lon)[0][0] + + row[lon_i] = vox + except: + rows.append(row) + continue + + rows.append(row) + + rows = 10*np.log10(np.array(rows)) + + return ProfileTomoResults( + results=rows, + s={'latitude': latitude}, + extent=[lons[0], lons[-1], min_elevation, max_elevation], + meta=dict(dataset=dataset) ) - return result class ElevationTomoResults(NexusResults): def __init__(self, results=None, meta=None, stats=None, computeOptions=None, status_code=200, **args): @@ -261,10 +575,12 @@ class ElevationTomoResults(NexusResults): lats = lats.tolist() lons = lons.tolist() + logger.info('Building plot') + plt.figure(figsize=(15,11)) plt.imshow(np.squeeze(10*np.log10(data)), vmax=-10, vmin=-30) plt.colorbar(label=f'Tomogram, z={attrs["elevation"]} ± {attrs["margin"]} m (dB)') - plt.title(f'Tomogram @ {attrs["elevation"]} ± {attrs["margin"]} m elevation') + plt.title(f'{attrs["ds"]} tomogram @ {attrs["elevation"]} ± {attrs["margin"]} m elevation') plt.ylabel('Latitude') plt.xlabel('Longitude') @@ -278,10 +594,44 @@ class ElevationTomoResults(NexusResults): buffer = BytesIO() + logger.info('Writing plot to buffer') plt.savefig(buffer, format='png', facecolor='white') buffer.seek(0) return buffer.read() - def toJson(self): - pass + +class ProfileTomoResults(NexusResults): + def __init__(self, results=None, s=None, extent=None, meta=None, stats=None, computeOptions=None, status_code=200, **args): + NexusResults.__init__(self, results, meta, stats, computeOptions, status_code, **args) + self.__extent = extent + self.__slice = s + + def toImage(self): + rows = self.results() + + if 'longitude' in self.__slice: + lon = self.__slice['longitude'] + title_row = f'Longitude: {lon}' + xlabel = 'Latitude' + else: + lat = self.__slice['latitude'] + title_row = f'Latitude: {lat}' + xlabel = 'Longitude' + + logger.info('Building plot') + + plt.figure(figsize=(11, 7)) + plt.imshow(np.flipud(rows), extent=self.__extent, vmin=-30, vmax=-10, aspect='auto') + plt.colorbar() + plt.title(f'{self.meta()["dataset"]} tomogram slice\n{title_row}') + plt.xlabel(xlabel) + plt.ylabel('Height Relative to WGS84 ellipsoid (m)') + + buffer = BytesIO() + + logger.info('Writing plot to buffer') + plt.savefig(buffer, format='png', facecolor='white') + + buffer.seek(0) + return buffer.read()
