This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/sdap-nexus.git
The following commit(s) were added to refs/heads/develop by this push:
new 28573fe SDAP-469 - Support for data with elevation (#276)
28573fe is described below
commit 28573fe2fea4f500b476ba780163a7e7c454e88a
Author: Riley Kuttruff <[email protected]>
AuthorDate: Fri Jul 5 13:10:51 2024 -0700
SDAP-469 - Support for data with elevation (#276)
* SDAP-469 - Configured timeseriesspark to have the option to bound by depth
* Elevation fetch & masking
untested
* Elevation subsetting
* fixes
* Check if there's elevation when pulling the tile data
* Elevation masking for tiles with no elevation does nothing
* Add elevation fields to collection creation script
* Name elevation fields to be dynamically typed correctly
* Elevation array shape + add to nexuspoint
* Add elev to data in bounds result
* Improved handling of elev arguments & masking
* tile elev in nexusproto backend
* Docker fix
* Additional algorithm support for SDAP 469 3d data (#10)
* Added support for default elevation value of 0 when parameter not provided
* Added support for setting min and max elevation when using single
elevation arg
* Added support for data ingested with elevation
* Fixed logic for elevation clause when min and max elevations are
equivalent
* Fixed logic for elevation clause when min and max elevations are
equivalent
* Bug fix for missing elevation parameters
* Reverted timeAvgMapSpark to use NexusRequestObject
* Fixed bug with order of arguments
* Commented out saving netcdf files
* Bug fix for how numpy arrays are handled
* Cleaned up logic for handling the different arg cases
* Reworked elevation clause logic and added to polygon search
* Added elevation args to find_tiles_in_polygon
---------
Co-authored-by: rileykk <[email protected]>
Co-authored-by: Kevin <[email protected]>
---
.../webservice/algorithms/DataInBoundsSearch.py | 96 ++++++++++++--
.../webservice/algorithms_spark/ClimMapSpark.py | 2 +-
.../webservice/algorithms_spark/CorrMapSpark.py | 2 +-
.../DailyDifferenceAverageSpark.py | 14 +-
.../webservice/algorithms_spark/HofMoellerSpark.py | 20 ++-
.../algorithms_spark/MaximaMinimaSpark.py | 24 +++-
.../webservice/algorithms_spark/TimeAvgMapSpark.py | 18 ++-
.../webservice/algorithms_spark/TimeSeriesSpark.py | 32 +++--
.../webservice/algorithms_spark/VarianceSpark.py | 30 +++--
analysis/webservice/apidocs/openapi.yml | 144 +++++++++++++++++++++
.../request/handlers/NexusRequestHandler.py | 8 +-
.../request/renderers/NexusJSONRenderer.py | 13 ++
analysis/webservice/webmodel/NexusRequestObject.py | 39 ++++++
.../nexustiles/backends/nexusproto/backend.py | 5 +
.../backends/nexusproto/dao/CassandraProxy.py | 32 +++++
.../backends/nexusproto/dao/SolrProxy.py | 41 +++++-
data-access/nexustiles/model/nexusmodel.py | 17 ++-
data-access/nexustiles/nexustiles.py | 72 +++++++++++
docker/nexus-webapp/Dockerfile | 2 +-
docker/solr/cloud-init/create-collection.py | 2 +
20 files changed, 555 insertions(+), 58 deletions(-)
diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py
b/analysis/webservice/algorithms/DataInBoundsSearch.py
index 2df061f..dcac93b 100644
--- a/analysis/webservice/algorithms/DataInBoundsSearch.py
+++ b/analysis/webservice/algorithms/DataInBoundsSearch.py
@@ -14,8 +14,11 @@
# limitations under the License.
+import io
+import gzip
import json
import numpy
+import logging
from datetime import datetime
from pytz import timezone
@@ -68,6 +71,9 @@ class DataInBoundsSearchCalcHandlerImpl(NexusCalcHandler):
}
singleton = True
+ def __init__(self, tile_service_factory, **kwargs):
+ NexusCalcHandler.__init__(self, tile_service_factory,
desired_projection='swath')
+
def parse_arguments(self, request):
# Parse input arguments
@@ -114,27 +120,68 @@ class DataInBoundsSearchCalcHandlerImpl(NexusCalcHandler):
"Maximum (Northern) Latitude. 'metadataFilter' must
be in the form key:value",
code=400)
- return ds, parameter_s, start_time, end_time, bounding_polygon,
metadata_filter
+ min_elevation, max_elevation = request.get_elevation_args()
+
+ if (min_elevation and max_elevation) and min_elevation > max_elevation:
+ raise NexusProcessingException(
+ reason='Min elevation must be less than or equal to max
elevation',
+ code=400
+ )
+
+ compact_result = request.get_boolean_arg('compact')
+
+ return ds, parameter_s, start_time, end_time, bounding_polygon,
metadata_filter, min_elevation, max_elevation, compact_result
def calc(self, computeOptions, **args):
- ds, parameter, start_time, end_time, bounding_polygon, metadata_filter
= self.parse_arguments(computeOptions)
+ ds, parameter, start_time, end_time, bounding_polygon,\
+ metadata_filter, min_elevation, max_elevation, compact =
self.parse_arguments(computeOptions)
includemeta = computeOptions.get_include_meta()
+ log = logging.getLogger(__name__)
+
min_lat = max_lat = min_lon = max_lon = None
+ tile_service = self._get_tile_service()
+
if bounding_polygon:
min_lat = bounding_polygon.bounds[1]
max_lat = bounding_polygon.bounds[3]
min_lon = bounding_polygon.bounds[0]
max_lon = bounding_polygon.bounds[2]
- tiles = self._get_tile_service().get_tiles_bounded_by_box(min_lat,
max_lat, min_lon, max_lon, ds, start_time,
- end_time)
+ tiles = tile_service.find_tiles_in_box(min_lat, max_lat, min_lon,
max_lon, ds, start_time, end_time,
+
min_elevation=min_elevation, max_elevation=max_elevation, fetch_data=False)
+
+ need_to_fetch = True
else:
tiles =
self._get_tile_service().get_tiles_by_metadata(metadata_filter, ds, start_time,
end_time)
+ need_to_fetch = False
data = []
- for tile in tiles:
+
+ log.info(f'Matched {len(tiles):,} tiles.')
+
+ for i in range(len(tiles)-1, -1, -1): # tile in tiles:
+ tile = tiles.pop(i)
+
+ tile_id = tile.tile_id
+
+ log.info(f'Processing tile {tile_id} | {i=}')
+
+ if need_to_fetch:
+ tile = tile_service.fetch_data_for_tiles(tile)[0]
+ tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
min_lon, max_lon, [tile])
+ tile = tile_service.mask_tiles_to_time_range(start_time,
end_time, tile)
+
+ if min_elevation is not None and max_elevation is not None:
+ tile = tile_service.mask_tiles_to_elevation(min_elevation,
max_elevation, tile)
+
+ if len(tile) == 0:
+ log.info(f'Skipping empty tile {tile_id}')
+ continue
+
+ tile = tile[0]
+
for nexus_point in tile.nexus_point_generator():
point = dict()
@@ -159,15 +206,26 @@ class DataInBoundsSearchCalcHandlerImpl(NexusCalcHandler):
except (KeyError, IndexError):
pass
else:
- point['variable'] = nexus_point.data_vals
+ variables = []
+
+ data_vals = nexus_point.data_vals if tile.is_multi else
[nexus_point.data_vals]
+
+ for value, variable in zip(data_vals, tile.variables):
+ if variable.standard_name:
+ var_name = variable.standard_name
+ else:
+ var_name = variable.variable_name
+
+ variables.append({var_name: value})
+
+ point['variables'] = variables
data.append({
'latitude': nexus_point.latitude,
'longitude': nexus_point.longitude,
'time': nexus_point.time,
- 'data': [
- point
- ]
+ 'elevation': nexus_point.depth,
+ 'data': point
})
if includemeta and len(tiles) > 0:
@@ -178,14 +236,22 @@ class DataInBoundsSearchCalcHandlerImpl(NexusCalcHandler):
result = DataInBoundsResult(
results=data,
stats={},
- meta=meta)
+ meta=meta,
+ compact=compact
+ )
result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time,
end_time)
+ log.info(f'Finished subsetting. Generated {len(data):,} points')
+
return result
class DataInBoundsResult(NexusResults):
+ def __init__(self, results=None, meta=None, stats=None,
computeOptions=None, status_code=200, compact=False, **args):
+ NexusResults.__init__(self, results, meta, stats, computeOptions,
status_code, **args)
+ self.__compact = compact
+
def toCSV(self):
rows = []
@@ -229,7 +295,15 @@ class DataInBoundsResult(NexusResults):
return "\r\n".join(rows)
def toJson(self):
- return json.dumps(self.results(), indent=4, cls=NpEncoder)
+ if not self.__compact:
+ return json.dumps(self.results(), indent=4, cls=NpEncoder)
+ else:
+ buffer = io.BytesIO()
+ with gzip.open(buffer, 'wt', encoding='ascii') as zip:
+ json.dump(self.results(), zip, cls=NpEncoder)
+
+ buffer.seek(0)
+ return buffer.read()
class NpEncoder(json.JSONEncoder):
def default(self, obj):
diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py
b/analysis/webservice/algorithms_spark/ClimMapSpark.py
index 8b34324..087cb8e 100644
--- a/analysis/webservice/algorithms_spark/ClimMapSpark.py
+++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py
@@ -248,7 +248,7 @@ class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
y0, y1, x0, x1))
# Store global map in a NetCDF file.
- self._create_nc_file(a, 'clmap.nc', 'val')
+ # self._create_nc_file(a, 'clmap.nc', 'val')
# Create dict for JSON response
results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py
b/analysis/webservice/algorithms_spark/CorrMapSpark.py
index 71eaf72..23211cb 100644
--- a/analysis/webservice/algorithms_spark/CorrMapSpark.py
+++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py
@@ -296,7 +296,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
# Store global map in a NetCDF file.
- self._create_nc_file(r, 'corrmap.nc', 'r')
+ # self._create_nc_file(r, 'corrmap.nc', 'r')
# Create dict for JSON response
results = [[{'r': r[y, x], 'cnt': int(n[y, x]),
diff --git
a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
index 12f7dee..960b6e2 100644
--- a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
+++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
@@ -121,13 +121,21 @@ class
DailyDifferenceAverageNexusImplSpark(NexusCalcSparkHandler):
start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
+
+ min_elevation, max_elevation = request.get_elevation_args()
+
+ if (min_elevation and max_elevation) and min_elevation > max_elevation:
+ raise NexusProcessingException(
+ reason='Min elevation must be less than or equal to max
elevation',
+ code=400
+ )
plot = request.get_boolean_arg("plot", default=False)
- return bounding_polygon, dataset, climatology, start_time,
start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot
+ return bounding_polygon, dataset, climatology, start_time,
start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot,
min_elevation, max_elevation
def calc(self, request, **args):
- bounding_polygon, dataset, climatology, start_time,
start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot =
self.parse_arguments(
+ bounding_polygon, dataset, climatology, start_time,
start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot,
min_elevation, max_elevation = self.parse_arguments(
request)
self.log.debug("Querying for tiles in search domain")
@@ -135,6 +143,8 @@ class
DailyDifferenceAverageNexusImplSpark(NexusCalcSparkHandler):
tile_ids = [tile.tile_id for tile in
self._get_tile_service().find_tiles_in_polygon(bounding_polygon, dataset,
start_seconds_from_epoch, end_seconds_from_epoch,
+
min_elevation=min_elevation,
+
max_elevation=max_elevation,
fetch_data=False, fl='id',
sort=['tile_min_time_dt asc', 'tile_min_lon asc',
'tile_min_lat asc'], rows=5000)]
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index e876a11..d0a9663 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -194,8 +194,16 @@ class
BaseHoffMoellerSparkHandlerImpl(NexusCalcSparkHandler):
start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
normalize_dates = request.get_normalize_dates()
+
+ min_elevation, max_elevation = request.get_elevation_args()
- return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, normalize_dates
+ if (min_elevation and max_elevation) and min_elevation > max_elevation:
+ raise NexusProcessingException(
+ reason='Min elevation must be less than or equal to max
elevation',
+ code=400
+ )
+
+ return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, normalize_dates, min_elevation, max_elevation
def applyDeseasonToHofMoellerByField(self, results, pivot="lats",
field="mean", append=True):
shape = (len(results), len(results[0][pivot]))
@@ -345,7 +353,7 @@ class
LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
def calc(self, compute_options, **args):
- ds, bbox, start_time, end_time, normalize_dates =
self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, normalize_dates, min_elevation,
max_elevation = self.parse_arguments(compute_options)
metrics_record = self._create_metrics_record()
calculation_start = datetime.now()
@@ -354,7 +362,8 @@ class
LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat,
min_lon, max_lon, tile.dataset) for x, tile in
enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon,
max_lon,
-
ds, start_time, end_time,
+
ds, start_time, end_time, min_elevation=min_elevation,
+
max_elevation=max_elevation,
metrics_callback=metrics_record.record_metrics,
fetch_data=False))]
@@ -401,7 +410,7 @@ class
LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
def calc(self, compute_options, **args):
- ds, bbox, start_time, end_time, normalize_dates =
self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, normalize_dates, min_elevation,
max_elevation = self.parse_arguments(compute_options)
metrics_record = self._create_metrics_record()
calculation_start = datetime.now()
@@ -410,7 +419,8 @@ class
LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat,
min_lon, max_lon, tile.dataset) for x, tile in
enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon,
max_lon,
-
ds, start_time, end_time,
+
ds, start_time, end_time, min_elevation=min_elevation,
+
max_elevation=max_elevation,
metrics_callback=metrics_record.record_metrics,
fetch_data=False))]
diff --git a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
index 7521d37..cebce99 100644
--- a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
+++ b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
@@ -127,8 +127,16 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
+
+ min_elevation, max_elevation = request.get_elevation_args()
- return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, nparts_requested
+ if (min_elevation and max_elevation) and min_elevation > max_elevation:
+ raise NexusProcessingException(
+ reason='Min elevation must be less than or equal to max
elevation',
+ code=400
+ )
+
+ return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, nparts_requested, min_elevation, max_elevation
def calc(self, compute_options, **args):
@@ -139,7 +147,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
:return:
"""
- ds, bbox, start_time, end_time, nparts_requested =
self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, nparts_requested, min_elevation,
max_elevation = self.parse_arguments(compute_options)
self._setQueryParams(ds,
(float(bbox.bounds[1]),
float(bbox.bounds[3]),
@@ -180,9 +188,9 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
self._maxLonCent))
# Create array of tuples to pass to Spark map function
- nexus_tiles_spark = [[self._find_tile_bounds(t),
+ nexus_tiles_spark = np.array([[self._find_tile_bounds(t),
self._startTime, self._endTime,
- self._ds] for t in nexus_tiles]
+ self._ds] for t in nexus_tiles], dtype='object')
# Remove empty tiles (should have bounds set to None)
bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
@@ -205,7 +213,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- max_min_part = rdd.map(partial(self._map, self._tile_service_factory))
+ max_min_part = rdd.map(partial(self._map, self._tile_service_factory,
min_elevation, max_elevation))
max_min_count = \
max_min_part.combineByKey(lambda val: val,
lambda x, val: (np.maximum(x[0],
val[0]), # Max
@@ -281,7 +289,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
# this operates on only one nexus tile bound over time. Can assume all
nexus_tiles are the same shape
@staticmethod
- def _map(tile_service_factory, tile_in_spark):
+ def _map(tile_service_factory, min_elevation, max_elevation,
tile_in_spark):
# tile_in_spark is a spatial tile that corresponds to nexus tiles of
the same area
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
@@ -313,7 +321,9 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler):
min_lon, max_lon,
ds=ds,
start_time=t_start,
- end_time=t_end)
+ end_time=t_end,
+
min_elevation=min_elevation,
+
max_elevation=max_elevation)
for tile in nexus_tiles:
# Take max and min of the data - just used the masked arrays,
no extra .data
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 750ba59..97aaa14 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -115,8 +115,16 @@ class
TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
+
+ min_elevation, max_elevation = request.get_elevation_args()
- return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, nparts_requested
+ if (min_elevation and max_elevation) and min_elevation > max_elevation:
+ raise NexusProcessingException(
+ reason='Min elevation must be less than or equal to max
elevation',
+ code=400
+ )
+
+ return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, nparts_requested, min_elevation, max_elevation
def calc(self, compute_options, **args):
"""
@@ -129,7 +137,7 @@ class
TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
metrics_record = self._create_metrics_record()
- ds, bbox, start_time, end_time, nparts_requested =
self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, nparts_requested, min_elevation,
max_elevation = self.parse_arguments(compute_options)
self._setQueryParams(ds,
(float(bbox.bounds[1]),
float(bbox.bounds[3]),
@@ -197,7 +205,7 @@ class
TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
metrics_record.record_metrics(partitions=rdd.getNumPartitions())
- sum_count_part = rdd.map(partial(self._map,
self._tile_service_factory, metrics_record.record_metrics))
+ sum_count_part = rdd.map(partial(self._map,
self._tile_service_factory, metrics_record.record_metrics, min_elevation,
max_elevation))
reduce_duration = 0
reduce_start = datetime.now()
sum_count = sum_count_part.combineByKey(lambda val: val,
@@ -263,7 +271,7 @@ class
TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
endTime=end_time)
@staticmethod
- def _map(tile_service_factory, metrics_callback, tile_in_spark):
+ def _map(tile_service_factory, metrics_callback, min_elevation,
max_elevation, tile_in_spark):
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
min_y, max_y, min_x, max_x) = tile_bounds
@@ -290,6 +298,8 @@ class
TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
ds=ds,
start_time=t_start,
end_time=t_end,
+
min_elevation=min_elevation,
+
max_elevation=max_elevation,
metrics_callback=metrics_callback)
calculation_start = datetime.now()
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 804d3ec..087e2e6 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -157,7 +157,16 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
nparts_requested = request.get_nparts()
normalize_dates = request.get_normalize_dates()
- return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter,
nparts_requested, normalize_dates
+ min_elevation, max_elevation = request.get_elevation_args()
+
+ if (min_elevation and max_elevation) and min_elevation > max_elevation:
+ raise NexusProcessingException(
+ reason='Min elevation must be less than or equal to max
elevation',
+ code=400
+ )
+
+ return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, apply_seasonal_cycle_filter, \
+ apply_low_pass_filter, nparts_requested, normalize_dates,
min_elevation, max_elevation
def calc(self, request, **args):
"""
@@ -167,8 +176,8 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
:return:
"""
start_time = datetime.now()
- ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter,
nparts_requested, normalize_dates = self.parse_arguments(
- request)
+ ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, apply_seasonal_cycle_filter, \
+ apply_low_pass_filter, nparts_requested, normalize_dates,
min_elevation, max_elevation = self.parse_arguments(request)
metrics_record = self._create_metrics_record()
resultsRaw = []
@@ -201,6 +210,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
metrics_record.record_metrics,
normalize_dates,
spark_nparts=spark_nparts,
+ min_elevation=min_elevation,
max_elevation=max_elevation,
sc=self._sc)
if apply_seasonal_cycle_filter:
@@ -223,6 +233,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
self._tile_service_factory,
metrics_record.record_metrics,
normalize_dates=False,
+ min_elevation=min_elevation,
max_elevation=max_elevation,
spark_nparts=spark_nparts,
sc=self._sc)
clim_indexed_by_month =
{datetime.utcfromtimestamp(result['time']).month: result for result in
results_clim}
@@ -258,8 +269,7 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
"LowPass filter calculation took %s for dataset %s" %
(str(datetime.now() - the_time), shortName))
the_time = datetime.now()
- self._create_nc_file_time1d(np.array(results), 'ts.nc', 'mean',
- fill=-9999.)
+ # self._create_nc_file_time1d(np.array(results), 'ts.nc', 'mean',
fill=-9999.)
self.log.info(
"NetCDF generation took %s for dataset %s" %
(str(datetime.now() - the_time), shortName))
@@ -447,7 +457,7 @@ class TimeSeriesResults(NexusResults):
def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory,
metrics_callback, normalize_dates, fill=-9999.,
- spark_nparts=1, sc=None):
+ spark_nparts=1, min_elevation=None, max_elevation=None,
sc=None):
nexus_tiles_spark = [(bounding_polygon, ds,
list(daysinrange_part), fill)
for daysinrange_part
@@ -456,14 +466,14 @@ def spark_driver(daysinrange, bounding_polygon, ds,
tile_service_factory, metric
# Launch Spark computations
rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
metrics_callback(partitions=rdd.getNumPartitions())
- results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory,
metrics_callback, normalize_dates)).collect()
+ results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory,
metrics_callback, normalize_dates, min_elevation, max_elevation)).collect()
results = list(itertools.chain.from_iterable(results))
results = sorted(results, key=lambda entry: entry["time"])
return results, {}
-def calc_average_on_day(tile_service_factory, metrics_callback,
normalize_dates, tile_in_spark):
+def calc_average_on_day(tile_service_factory, metrics_callback,
normalize_dates, min_elevation, max_elevation, tile_in_spark):
import shapely.wkt
from datetime import datetime
from pytz import timezone
@@ -474,6 +484,8 @@ def calc_average_on_day(tile_service_factory,
metrics_callback, normalize_dates,
return []
tile_service = tile_service_factory()
+ logger.info(f'{max_elevation=} {min_elevation=}')
+
ds1_nexus_tiles = \
tile_service.get_tiles_bounded_by_box(bounding_polygon.bounds[1],
bounding_polygon.bounds[3],
@@ -483,9 +495,11 @@ def calc_average_on_day(tile_service_factory,
metrics_callback, normalize_dates,
timestamps[0],
timestamps[-1],
rows=5000,
+ min_elevation=min_elevation,
+ max_elevation=max_elevation,
metrics_callback=metrics_callback,
distinct=True)
-
+
calculation_start = datetime.now()
tile_dict = {}
diff --git a/analysis/webservice/algorithms_spark/VarianceSpark.py
b/analysis/webservice/algorithms_spark/VarianceSpark.py
index 7c217da..372385a 100644
--- a/analysis/webservice/algorithms_spark/VarianceSpark.py
+++ b/analysis/webservice/algorithms_spark/VarianceSpark.py
@@ -129,8 +129,16 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
start_seconds_from_epoch = int((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = int((end_time - EPOCH).total_seconds())
+
+ min_elevation, max_elevation = request.get_elevation_args()
- return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, nparts_requested
+ if (min_elevation and max_elevation) and min_elevation > max_elevation:
+ raise NexusProcessingException(
+ reason='Min elevation must be less than or equal to max
elevation',
+ code=400
+ )
+
+ return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, nparts_requested, min_elevation, max_elevation
def calc(self, compute_options, **args):
"""
@@ -140,7 +148,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
:return:
"""
- ds, bbox, start_time, end_time, nparts_requested =
self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, nparts_requested, min_elevation,
max_elevation = self.parse_arguments(compute_options)
self._setQueryParams(ds,
(float(bbox.bounds[1]),
float(bbox.bounds[3]),
@@ -205,7 +213,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- sum_count_part = rdd.map(partial(self._map,
self._tile_service_factory))
+ sum_count_part = rdd.map(partial(self._map,
self._tile_service_factory, min_elevation, max_elevation))
sum_count = \
sum_count_part.combineByKey(lambda val: val,
lambda x, val: (x[0] + val[0],
@@ -233,7 +241,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
self.log.info('Using {} partitions'.format(spark_nparts))
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
- anomaly_squared_part = rdd.map(partial(self._calc_variance,
self._tile_service_factory))
+ anomaly_squared_part = rdd.map(partial(self._calc_variance,
self._tile_service_factory, min_elevation, max_elevation))
anomaly_squared = \
anomaly_squared_part.combineByKey(lambda val: val,
lambda x, val: (x[0] + val[0],
@@ -287,7 +295,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
y0, y1, x0, x1))
# Store global map in a NetCDF file.
- self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
+ # self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
# Create dict for JSON response
results = [[{'variance': a[y, x], 'cnt': int(n[y, x]),
@@ -301,7 +309,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
endTime=end_time)
@staticmethod
- def _map(tile_service_factory, tile_in_spark):
+ def _map(tile_service_factory, min_elevation, max_elevation,
tile_in_spark):
# tile_in_spark is a spatial tile that corresponds to nexus tiles of
the same area
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
@@ -328,7 +336,9 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
min_lon, max_lon,
ds=ds,
start_time=t_start,
- end_time=t_end)
+ end_time=t_end,
+
min_elevation=min_elevation,
+
max_elevation=max_elevation)
for tile in nexus_tiles:
# Taking the data, converted masked nans to 0
@@ -343,7 +353,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
return tile_bounds, (sum_tile, cnt_tile)
@staticmethod
- def _calc_variance(tile_service_factory, tile_in_spark):
+ def _calc_variance(tile_service_factory, min_elevation, max_elevation,
tile_in_spark):
# tile_in_spark is a spatial tile that corresponds to nexus tiles of
the same area
tile_bounds = tile_in_spark[0]
(min_lat, max_lat, min_lon, max_lon,
@@ -375,7 +385,9 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler):
min_lon, max_lon,
ds=ds,
start_time=t_start,
- end_time=t_end)
+ end_time=t_end,
+
min_elevation=min_elevation,
+
max_elevation=max_elevation)
for tile in nexus_tiles:
# Taking the data, converted masked nans to 0
diff --git a/analysis/webservice/apidocs/openapi.yml
b/analysis/webservice/apidocs/openapi.yml
index 8086b38..b274325 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -511,6 +511,15 @@ paths:
schema:
type: string
example: -45,15,-30,30
+ - $ref: '#/components/parameters/MinDepth'
+ - $ref: '#/components/parameters/MaxDepth'
+ - $ref: '#/components/parameters/Depth'
+ - $ref: '#/components/parameters/MinHeight'
+ - $ref: '#/components/parameters/MaxHeight'
+ - $ref: '#/components/parameters/Height'
+ - $ref: '#/components/parameters/MinElevation'
+ - $ref: '#/components/parameters/MaxElevation'
+ - $ref: '#/components/parameters/Elevation'
responses:
"200":
description: Successful operation
@@ -684,6 +693,15 @@ paths:
schema:
type: string
example: "-180,-90,180,90"
+ - $ref: '#/components/parameters/MinDepth'
+ - $ref: '#/components/parameters/MaxDepth'
+ - $ref: '#/components/parameters/Depth'
+ - $ref: '#/components/parameters/MinHeight'
+ - $ref: '#/components/parameters/MaxHeight'
+ - $ref: '#/components/parameters/Height'
+ - $ref: '#/components/parameters/MinElevation'
+ - $ref: '#/components/parameters/MaxElevation'
+ - $ref: '#/components/parameters/Elevation'
responses:
"200":
description: Successful operation
@@ -757,6 +775,15 @@ paths:
schema:
type: boolean
default: true
+ - $ref: '#/components/parameters/MinDepth'
+ - $ref: '#/components/parameters/MaxDepth'
+ - $ref: '#/components/parameters/Depth'
+ - $ref: '#/components/parameters/MinHeight'
+ - $ref: '#/components/parameters/MaxHeight'
+ - $ref: '#/components/parameters/Height'
+ - $ref: '#/components/parameters/MinElevation'
+ - $ref: '#/components/parameters/MaxElevation'
+ - $ref: '#/components/parameters/Elevation'
responses:
"200":
description: Successful operation
@@ -821,6 +848,15 @@ paths:
schema:
type: string
example: "-180,-90,180,90"
+ - $ref: '#/components/parameters/MinDepth'
+ - $ref: '#/components/parameters/MaxDepth'
+ - $ref: '#/components/parameters/Depth'
+ - $ref: '#/components/parameters/MinHeight'
+ - $ref: '#/components/parameters/MaxHeight'
+ - $ref: '#/components/parameters/Height'
+ - $ref: '#/components/parameters/MinElevation'
+ - $ref: '#/components/parameters/MaxElevation'
+ - $ref: '#/components/parameters/Elevation'
responses:
"200":
description: Successful operation
@@ -880,6 +916,15 @@ paths:
schema:
type: string
example: "-180,-90,180,90"
+ - $ref: '#/components/parameters/MinDepth'
+ - $ref: '#/components/parameters/MaxDepth'
+ - $ref: '#/components/parameters/Depth'
+ - $ref: '#/components/parameters/MinHeight'
+ - $ref: '#/components/parameters/MaxHeight'
+ - $ref: '#/components/parameters/Height'
+ - $ref: '#/components/parameters/MinElevation'
+ - $ref: '#/components/parameters/MaxElevation'
+ - $ref: '#/components/parameters/Elevation'
responses:
"200":
description: Successful operation
@@ -939,6 +984,15 @@ paths:
schema:
type: string
example: "-180,-90,180,90"
+ - $ref: '#/components/parameters/MinDepth'
+ - $ref: '#/components/parameters/MaxDepth'
+ - $ref: '#/components/parameters/Depth'
+ - $ref: '#/components/parameters/MinHeight'
+ - $ref: '#/components/parameters/MaxHeight'
+ - $ref: '#/components/parameters/Height'
+ - $ref: '#/components/parameters/MinElevation'
+ - $ref: '#/components/parameters/MaxElevation'
+ - $ref: '#/components/parameters/Elevation'
responses:
"200":
description: Successful operation
@@ -998,6 +1052,15 @@ paths:
schema:
type: string
example: "-180,-90,180,90"
+ - $ref: '#/components/parameters/MinDepth'
+ - $ref: '#/components/parameters/MaxDepth'
+ - $ref: '#/components/parameters/Depth'
+ - $ref: '#/components/parameters/MinHeight'
+ - $ref: '#/components/parameters/MaxHeight'
+ - $ref: '#/components/parameters/Height'
+ - $ref: '#/components/parameters/MinElevation'
+ - $ref: '#/components/parameters/MaxElevation'
+ - $ref: '#/components/parameters/Elevation'
responses:
"200":
description: Successful operation
@@ -1057,6 +1120,15 @@ paths:
schema:
type: string
example: "-180,-90,180,90"
+ - $ref: '#/components/parameters/MinDepth'
+ - $ref: '#/components/parameters/MaxDepth'
+ - $ref: '#/components/parameters/Depth'
+ - $ref: '#/components/parameters/MinHeight'
+ - $ref: '#/components/parameters/MaxHeight'
+ - $ref: '#/components/parameters/Height'
+ - $ref: '#/components/parameters/MinElevation'
+ - $ref: '#/components/parameters/MaxElevation'
+ - $ref: '#/components/parameters/Elevation'
responses:
"200":
description: Successful operation
@@ -1222,6 +1294,78 @@ components:
- 48
example: 30
explode: false
+ MinDepth:
+ in: query
+ name: minDepth
+ description: Minimum depth where positive values correspond to distance
below surface. Must be less than maxDepth if maxDepth provided
+ required: false
+ schema:
+ type: number
+ example: 0
+ MaxDepth:
+ in: query
+ name: maxDepth
+ description: Maximum depth where positive values correspond to distance
below surface. Must be greater than minDepth if minDepth provided
+ required: false
+ schema:
+ type: number
+ example: 10
+ Depth:
+ in: query
+ name: depth
+ description: Depth where positive values correspond to distance below
surface
+ required: false
+ schema:
+ type: number
+ example: 5
+ MinHeight:
+ in: query
+ name: minHeight
+ description: Minimum height above surface. Must be less than maxHeight
if maxHeight provided
+ required: false
+ schema:
+ type: number
+ example: 0
+ MaxHeight:
+ in: query
+ name: maxHeight
+ description: Maximum height above surface. Must be greater than
minHeight if minHeight provided
+ required: false
+ schema:
+ type: number
+ example: 10
+ Height:
+ in: query
+ name: height
+ description: Height above surface
+ required: false
+ schema:
+ type: number
+ example: 5
+ MinElevation:
+ in: query
+ name: minElevation
+ description: Minimum elevation above surface. Must be less than
maxElevation if maxElevation provided
+ required: false
+ schema:
+ type: number
+ example: 0
+ MaxElevation:
+ in: query
+ name: maxElevation
+ description: Maximum elevation above surface. Must be greater than
minElevation if minElevation provided
+ required: false
+ schema:
+ type: number
+ example: 10
+ Elevation:
+ in: query
+ name: elevation
+ description: Elevation above surface
+ required: false
+ schema:
+ type: number
+ example: 5
schemas:
DomsQueryResult:
type: object
diff --git
a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
index 7a55967..b95716a 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py
@@ -37,10 +37,10 @@ class NexusRequestHandler(tornado.web.RequestHandler):
# temporary hack to use a NexusRequestObject without tornado request
references
# this object only supports timeAvgMapSpark yet.
# Will be extended to replace the historical object in the next pull
request related to ticket SDAP-252
- if self.request.path == '/timeAvgMapSpark':
- request = NexusRequestObjectTornadoFree(self)
- else:
- request = NexusRequestObject(self)
+ # if self.request.path == '/timeAvgMapSpark':
+ # request = NexusRequestObjectTornadoFree(self)
+ # else:
+ request = NexusRequestObject(self)
# create NexusCalcHandler which will process the request
instance = self.__clazz(**self._clazz_init_args)
diff --git
a/analysis/webservice/nexus_tornado/request/renderers/NexusJSONRenderer.py
b/analysis/webservice/nexus_tornado/request/renderers/NexusJSONRenderer.py
index 3e6308f..5d828e5 100644
--- a/analysis/webservice/nexus_tornado/request/renderers/NexusJSONRenderer.py
+++ b/analysis/webservice/nexus_tornado/request/renderers/NexusJSONRenderer.py
@@ -16,6 +16,9 @@
import sys
import traceback
import json
+import logging
+
+logger = logging.getLogger(__name__)
class NexusJSONRenderer(object):
@@ -23,9 +26,19 @@ class NexusJSONRenderer(object):
self.request = nexus_request
def render(self, tornado_handler, result):
+ logger.info('Rendering JSON result')
+
tornado_handler.set_header("Content-Type", "application/json")
try:
result_str = result.toJson()
+
+ if isinstance(result_str, bytes):
+ tornado_handler.set_header("Content-Type", "application/gzip")
+ tornado_handler.set_header("Content-Disposition",
+ "attachment; filename=\"%s\"" %
self.request.get_argument('filename',
+
"subset.gz"))
+ logger.info('Writing result')
+
tornado_handler.write(result_str)
tornado_handler.finish()
except AttributeError:
diff --git a/analysis/webservice/webmodel/NexusRequestObject.py
b/analysis/webservice/webmodel/NexusRequestObject.py
index 1896236..8b10886 100644
--- a/analysis/webservice/webmodel/NexusRequestObject.py
+++ b/analysis/webservice/webmodel/NexusRequestObject.py
@@ -17,6 +17,7 @@ import logging
import re
from datetime import datetime
from decimal import Decimal
+from typing import Optional, Tuple
from pytz import UTC
from shapely.geometry import Polygon
@@ -114,6 +115,44 @@ class NexusRequestObject(StatsComputeOptions):
def get_min_lon(self, default=Decimal(-180)):
return self.get_decimal_arg("minLon", default)
+ def get_elevation_args(self) -> Tuple[Optional[float], Optional[float]]:
+ '''
+ Extract both single elevation args (depth, height, elevation) or
+ min/max elevation args (minDepth/maxDepth, minHeight/maxHeight,
minElevation/maxElevation)
+ Return a tuple of the form (min, max)
+ '''
+ min_depth = self.get_float_arg('minDepth', None)
+ max_depth = self.get_float_arg('maxDepth', None)
+ depth = self.get_float_arg('depth', None)
+
+ min_height = self.get_float_arg('minHeight', None)
+ max_height = self.get_float_arg('maxHeight', None)
+ height = self.get_float_arg('height', None)
+
+ min_elevation = self.get_float_arg('minElevation', None)
+ max_elevation = self.get_float_arg('maxElevation', None)
+ elevation = self.get_float_arg('elevation', None)
+
+ # Handle single parameter cases
+ if depth is not None:
+ return -1 * depth, -1 * depth
+ elif height is not None:
+ return height, height
+ elif elevation is not None:
+ return elevation, elevation
+
+ # Handle min/max parameter cases
+ ret_min = min_elevation or min_height or (-1 * min_depth if min_depth
is not None else None)
+ ret_max = max_elevation or max_height or (-1 * max_depth if max_depth
is not None else None)
+
+ # Validate max > min unless using depth args
+ if ret_max is not None and ret_min is not None and ret_max < ret_min:
+ if min_depth is not None or max_depth is not None:
+ ret_max, ret_min = ret_min, ret_max
+ else:
+ raise ValueError(f'Request max elevation less than min
elevation: {ret_max} < {ret_min}')
+ return ret_min, ret_max
+
# added to fit the simplified version of TimeAvgMapSpark parse_argumemt
def get_bounding_box(self):
diff --git a/data-access/nexustiles/backends/nexusproto/backend.py
b/data-access/nexustiles/backends/nexusproto/backend.py
index 690b109..2c99dad 100644
--- a/data-access/nexustiles/backends/nexusproto/backend.py
+++ b/data-access/nexustiles/backends/nexusproto/backend.py
@@ -428,6 +428,11 @@ class NexusprotoTileService(AbstractTileService):
a_tile.meta_data = meta
a_tile.is_multi = is_multi_var
+ elevation = tile_data_by_id[a_tile.tile_id].get_elevation_array()
+
+ if elevation is not None:
+ a_tile.elevation = np.broadcast_arrays(elevation, data)[0]
+
del (tile_data_by_id[a_tile.tile_id])
return tiles
diff --git a/data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py
b/data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py
index 96f7c4c..8769433 100644
--- a/data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py
+++ b/data-access/nexustiles/backends/nexusproto/dao/CassandraProxy.py
@@ -209,6 +209,38 @@ class NexusTileData(Model):
else:
raise NotImplementedError("Only supports grid_tile, swath_tile,
swath_multi_variable_tile, and time_series_tile")
+ def get_elevation_array(self):
+ tile_type = self._get_nexus_tile().WhichOneof("tile_type")
+ tile_data = getattr(self._get_nexus_tile(), tile_type)
+
+ if not tile_data.HasField('elevation'):
+ return None
+
+ elevation_data =
np.ma.masked_invalid(from_shaped_array(tile_data.elevation))
+
+ if tile_type in ['swath_tile', 'swath_multi_variable_tile']:
+ latitude_data =
np.ma.masked_invalid(from_shaped_array(tile_data.latitude)).reshape(-1)
+ longitude_data =
np.ma.masked_invalid(from_shaped_array(tile_data.longitude)).reshape(-1)
+ time_data =
np.ma.masked_invalid(from_shaped_array(tile_data.time)).reshape(-1)
+
+ if np.all(time_data == np.min(time_data)):
+ time_data = np.array([np.min(time_data)])
+
+ desired_shape = (
+ len(time_data),
+ len(latitude_data),
+ len(longitude_data),
+ )
+
+ elevation_data = self._to_standard_index(
+ elevation_data,
+ desired_shape,
+ tile_type == 'swath_multi_variable_tile'
+ )
+
+ return elevation_data
+
+
@staticmethod
def _to_standard_index(data_array, desired_shape, is_multi_var=False):
"""
diff --git a/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
b/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
index e466ad4..e99df92 100644
--- a/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
+++ b/data-access/nexustiles/backends/nexusproto/dao/SolrProxy.py
@@ -332,6 +332,12 @@ class SolrProxy(object):
search_start_s, search_end_s
)
additionalparams['fq'].append(time_clause)
+
+ min_elevation = kwargs.get('min_elevation', None)
+ max_elevation = kwargs.get('max_elevation', None)
+
+ elevation_clause = self.get_elevation_clause(min_elevation,
max_elevation)
+ additionalparams['fq'].append(elevation_clause)
self._merge_kwargs(additionalparams, **kwargs)
@@ -366,6 +372,12 @@ class SolrProxy(object):
search_start_s, search_end_s
)
additionalparams['fq'].append(time_clause)
+
+ min_elevation = kwargs['min_elevation'] if 'min_elevation' in kwargs
else None
+ max_elevation = kwargs['max_elevation'] if 'max_elevation' in kwargs
else None
+
+ elevation_clause = self.get_elevation_clause(min_elevation,
max_elevation)
+ additionalparams['fq'].append(elevation_clause)
self._merge_kwargs(additionalparams, **kwargs)
@@ -400,7 +412,13 @@ class SolrProxy(object):
search_start_s, search_end_s
)
additionalparams['fq'].append(time_clause)
-
+
+ min_elevation = kwargs['min_elevation'] if 'min_elevation' in kwargs
else None
+ max_elevation = kwargs['max_elevation'] if 'max_elevation' in kwargs
else None
+
+ elevation_clause = self.get_elevation_clause(min_elevation,
max_elevation)
+ additionalparams['fq'].append(elevation_clause)
+
self._merge_kwargs(additionalparams, **kwargs)
return self.do_query_all(
@@ -619,7 +637,26 @@ class SolrProxy(object):
search_start_s, search_end_s
)
return time_clause
-
+
+ def get_elevation_clause(self, min_elevation, max_elevation):
+ if min_elevation is not None and max_elevation is not None:
+ if min_elevation == max_elevation:
+ elevation_clause = f"(tile_min_elevation_d:[* TO
{min_elevation}] AND tile_max_elevation_d:[{max_elevation} TO *])"
+ else:
+ elevation_clause = (f"(tile_min_elevation_d:[{min_elevation}
TO {max_elevation}] OR "
+ f"tile_max_elevation_d:[{min_elevation} TO
{max_elevation}] OR "
+ f"(tile_min_elevation_d:[* TO
{min_elevation}] AND tile_max_elevation_d:[{max_elevation} TO *]))")
+ elif min_elevation is not None:
+ elevation_clause = (f"(tile_min_elevation_d:[{min_elevation} TO *]
AND "
+ f"tile_max_elevation_d:[{min_elevation} TO
*])")
+ elif max_elevation is not None:
+ elevation_clause = (f"(tile_min_elevation_d:[* TO {max_elevation}]
AND "
+ f"tile_max_elevation_d:[* TO
{max_elevation}])")
+ else:
+ elevation_clause = (f"((*:* -tile_min_elevation_d:[* TO *]) OR "
+ f"(tile_min_elevation_d:[0 TO 0] OR
tile_max_elevation_d:[0 TO 0]))")
+ return elevation_clause
+
def get_tile_count(self, ds, bounding_polygon=None, start_time=0,
end_time=-1, metadata=None, **kwargs):
"""
Return number of tiles that match search criteria.
diff --git a/data-access/nexustiles/model/nexusmodel.py
b/data-access/nexustiles/model/nexusmodel.py
index 7db4d61..e679be4 100644
--- a/data-access/nexustiles/model/nexusmodel.py
+++ b/data-access/nexustiles/model/nexusmodel.py
@@ -87,6 +87,7 @@ class Tile(object):
variables: list = None
latitudes: np.array = None
longitudes: np.array = None
+ elevation: np.array = None
times: np.array = None
data: np.array = None
is_multi: bool = None
@@ -137,7 +138,13 @@ class Tile(object):
data_vals = [data[index] for data in self.data]
else:
data_vals = self.data[index]
- point = NexusPoint(lat, lon, None, time, index, data_vals)
+
+ if self.elevation is not None:
+ elevation = self.elevation[index]
+ else:
+ elevation = np.nan
+
+ point = NexusPoint(lat, lon, elevation, time, index, data_vals)
yield point
else:
for index in indices:
@@ -149,7 +156,13 @@ class Tile(object):
data_vals = [data[index] for data in self.data]
else:
data_vals = self.data[index]
- point = NexusPoint(lat, lon, None, time, index, data_vals)
+
+ if self.elevation is not None:
+ elevation = self.elevation[index]
+ else:
+ elevation = np.nan
+
+ point = NexusPoint(lat, lon, elevation, time, index, data_vals)
yield point
def get_indices(self, include_nan=False):
diff --git a/data-access/nexustiles/nexustiles.py
b/data-access/nexustiles/nexustiles.py
index 408799b..3ce3c28 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -506,6 +506,13 @@ class NexusTileService:
if 0 <= start_time <= end_time:
tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles)
+ if 'min_elevation' in kwargs or 'max_elevation' in kwargs:
+ tiles = self.mask_tiles_to_elevation(
+ kwargs.get('min_elevation'),
+ kwargs.get('max_elevation'),
+ tiles
+ )
+
return tiles
def get_tiles_bounded_by_polygon(self, polygon, ds=None, start_time=0,
end_time=-1, **kwargs):
@@ -515,6 +522,13 @@ class NexusTileService:
if 0 <= start_time <= end_time:
tiles = self.mask_tiles_to_time_range(start_time, end_time, tiles)
+ if 'min_elevation' in kwargs or 'max_elevation' in kwargs:
+ tiles = self.mask_tiles_to_elevation(
+ kwargs.get('min_elevation'),
+ kwargs.get('max_elevation'),
+ tiles
+ )
+
return tiles
@catch_not_implemented
@@ -531,18 +545,39 @@ class NexusTileService:
tiles = self.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon,
max_lon, dataset, time, **kwargs)
tiles = self.mask_tiles_to_bbox_and_time(min_lat, max_lat, min_lon,
max_lon, time, time, tiles)
+ if 'min_elevation' in kwargs or 'max_elevation' in kwargs:
+ tiles = self.mask_tiles_to_elevation(
+ kwargs.get('min_elevation'),
+ kwargs.get('max_elevation'),
+ tiles
+ )
+
return tiles
def get_tiles_bounded_by_polygon_at_time(self, polygon, dataset, time,
**kwargs):
tiles = self.find_all_tiles_in_polygon_at_time(polygon, dataset, time,
**kwargs)
tiles = self.mask_tiles_to_polygon_and_time(polygon, time, time, tiles)
+ if 'min_elevation' in kwargs or 'max_elevation' in kwargs:
+ tiles = self.mask_tiles_to_elevation(
+ kwargs.get('min_elevation'),
+ kwargs.get('max_elevation'),
+ tiles
+ )
+
return tiles
def get_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon,
dataset, time, **kwargs):
tiles = self.find_all_boundary_tiles_at_time(min_lat, max_lat,
min_lon, max_lon, dataset, time, **kwargs)
tiles = self.mask_tiles_to_bbox_and_time(min_lat, max_lat, min_lon,
max_lon, time, time, tiles)
+ if 'min_elevation' in kwargs or 'max_elevation' in kwargs:
+ tiles = self.mask_tiles_to_elevation(
+ kwargs.get('min_elevation'),
+ kwargs.get('max_elevation'),
+ tiles
+ )
+
return tiles
@catch_not_implemented
@@ -686,6 +721,43 @@ class NexusTileService:
return tiles
+ def mask_tiles_to_elevation(self, min_e, max_e, tiles):
+ """
+ Masks data in tiles to specified time range.
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :param tiles: List of tiles
+ :return: A list tiles with data masked to specified time range
+ """
+ if min_e is None:
+ min_e = -float('inf')
+
+ if max_e is None:
+ max_e = float('inf')
+
+ for tile in tiles:
+ if tile.elevation is None:
+ continue
+
+ tile.elevation = ma.masked_outside(tile.elevation, min_e, max_e)
+
+ data_mask = ma.getmaskarray(tile.elevation)
+
+ # If this is multi-var, need to mask each variable separately.
+ if tile.is_multi:
+ # Combine space/time mask with existing mask on data
+ data_mask = reduce(np.logical_or, [tile.data[0].mask,
data_mask])
+
+ num_vars = len(tile.data)
+ multi_data_mask = np.repeat(data_mask[np.newaxis, ...],
num_vars, axis=0)
+ tile.data = ma.masked_where(multi_data_mask, tile.data)
+ else:
+ tile.data = ma.masked_where(data_mask, tile.data)
+
+ tiles[:] = [tile for tile in tiles if not tile.data.mask.all()]
+
+ return tiles
+
def fetch_data_for_tiles(self, *tiles):
dataset = tiles[0].dataset
diff --git a/docker/nexus-webapp/Dockerfile b/docker/nexus-webapp/Dockerfile
index 1dc6715..ae4d51c 100644
--- a/docker/nexus-webapp/Dockerfile
+++ b/docker/nexus-webapp/Dockerfile
@@ -66,7 +66,7 @@ ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
# Install additional dependencies for image
RUN apt-get update && \
- apt-get install --no-install-recommends -y proj-bin tini && \
+ apt-get install --no-install-recommends -y proj-bin tini git && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
diff --git a/docker/solr/cloud-init/create-collection.py
b/docker/solr/cloud-init/create-collection.py
index 682dd62..881b4d0 100755
--- a/docker/solr/cloud-init/create-collection.py
+++ b/docker/solr/cloud-init/create-collection.py
@@ -142,6 +142,8 @@ try:
add_field(schema_api, 'tile_min_lat', 'pdouble')
add_field(schema_api, 'tile_max_lon', 'pdouble')
add_field(schema_api, 'tile_min_lon', 'pdouble')
+ add_field(schema_api, 'tile_min_elevation_d', 'pdouble')
+ add_field(schema_api, 'tile_max_elevation_d', 'pdouble')
finally:
zk.stop()