This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 240ce1f SDAP-371: Fix domssubset API (#172)
240ce1f is described below
commit 240ce1fc4bba5d2d95de073b34a1abe513374ab5
Author: Stepheny Perez <[email protected]>
AuthorDate: Thu Jul 7 19:26:44 2022 -0700
SDAP-371: Fix domssubset API (#172)
* Fixed DOMS subsetter for satellite data
* Updated changelog
* Rename CSV file within ZIP based on dataset name
* Updated doms subsetter to support insitu subsetting
* Updated matchup algorithm to use insitu module
* Removed temp code
---
CHANGELOG.md | 2 +
analysis/tests/algorithms/test_subsetter.py | 126 ++++++++++++
analysis/webservice/algorithms/doms/insitu.py | 66 ++++++
analysis/webservice/algorithms/doms/subsetter.py | 244 ++++++++++++++---------
analysis/webservice/algorithms_spark/Matchup.py | 58 +-----
analysis/webservice/apidocs/openapi.yml | 6 +-
data-access/nexustiles/model/nexusmodel.py | 20 +-
data-access/nexustiles/nexustiles.py | 11 +-
8 files changed, 370 insertions(+), 163 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9b0642c..9d0bb20 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,10 +11,12 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- SDAP-372: Added new matchup endpoint `match_spark_doms` that points to DOMS
insitu endpoint
- SDAP-372: Updated `match_spark_doms` to interface with samos_cdms endpoint
- SDAP-393: Included `insitu` in ingress based on the value of
`insituAPI.enabled` in `values.yaml`
+- SDAP-371: Renamed `/domssubset` endpoint to `/cdmssubset`
### Changed
### Deprecated
### Removed
### Fixed
- Fix failing test_matchup unit test
- Fixed null value for count in matchup response
+- SDAP-371: Fixed DOMS subset endpoint
### Security
\ No newline at end of file
diff --git a/analysis/tests/algorithms/test_subsetter.py
b/analysis/tests/algorithms/test_subsetter.py
new file mode 100644
index 0000000..af99348
--- /dev/null
+++ b/analysis/tests/algorithms/test_subsetter.py
@@ -0,0 +1,126 @@
+import shutil
+import tempfile
+from os import listdir
+from os.path import dirname, join, realpath, abspath, isfile
+
+import pytest
+from webservice.algorithms.doms.subsetter import SubsetResult
+
+
+def test_to_csv(input_data):
+ """
+ Test that csv output contains the expected contents
+ """
+ result = SubsetResult(
+ results=input_data,
+ meta=None
+ )
+
+ csv_result = result.toCsv()
+
+ expected_result_ascat = '''\
+longitude,latitude,time,wind_speed,wind_to_direction\r
+179.97830000000002,-27.6048,2018-09-24T09:10:03Z,0.32999998331069946,98.30000305175781\r
+179.9642,-27.49519,2018-09-24T09:10:05Z,0.699999988079071,179.5'''
+
+ expected_result_icoads = '''\
+longitude,latitude,time,depth,platform_code,project,provider,sea_water_temperature,sea_water_temperature_quality\r
+160.47,-28.38,2018-09-24T08:00:00Z,-99999.0,42,ICOADS Release 3.0,NCAR,18.6,1\r
+179.88,-27.14,2018-09-24T08:00:00Z,-99999.0,42,ICOADS Release
3.0,NCAR,20.6,1'''
+
+ assert 'ASCATB-L2-Coastal' in csv_result
+ assert csv_result['ASCATB-L2-Coastal'] == expected_result_ascat
+
+ assert 'ICOADS Release 3.0' in csv_result
+ assert csv_result['ICOADS Release 3.0'] == expected_result_icoads
+
+
+def test_to_zip(input_data, temp_dir):
+ """
+ Test that zip output contains the expected contents
+ """
+ result = SubsetResult(
+ results=input_data,
+ meta=None
+ )
+
+ zip_result = result.toZip()
+ zip_location = join(temp_dir, 'response.zip')
+ unzip_location = join(temp_dir, 'zip_contents')
+
+ with open(zip_location, 'wb') as out_zip:
+ out_zip.write(zip_result)
+
+ shutil.unpack_archive(zip_location, unzip_location)
+
+ zip_contents = [
+ f for f in listdir(unzip_location) if isfile(join(unzip_location, f))
+ ]
+
+ assert len(zip_contents) == 2
+ assert 'ASCATB-L2-Coastal.csv' in zip_contents
+ assert 'ICOADS Release 3.0.csv' in zip_contents
+
+
[email protected]
+def temp_dir():
+ test_dir = dirname(realpath(__file__))
+ test_data_dir = abspath(join(test_dir, '..', 'data'))
+ temp_data_dir = tempfile.mkdtemp(dir=test_data_dir)
+ yield temp_data_dir
+ shutil.rmtree(temp_data_dir)
+
+
[email protected]
+def input_data():
+ data_dict = {
+ "ASCATB-L2-Coastal": [
+ {
+ "latitude": -27.6048,
+ "longitude": 179.97830000000002,
+ "time": 1537780203,
+ "data": {
+ "wind_speed": 0.32999998331069946,
+ "wind_to_direction": 98.30000305175781
+ }
+ },
+ {
+ "latitude": -27.49519,
+ "longitude": 179.9642,
+ "time": 1537780205,
+ "data": {
+ "wind_speed": 0.699999988079071,
+ "wind_to_direction": 179.5
+ }
+ }
+ ],
+ "ICOADS Release 3.0": [
+ {
+ "latitude": -28.38,
+ "longitude": 160.47,
+ "time": 1537776000.0,
+ "data": {
+ "depth": -99999.0,
+ "provider": "NCAR",
+ "project": "ICOADS Release 3.0",
+ "platform_code": "42",
+ "sea_water_temperature": 18.6,
+ "sea_water_temperature_quality": 1
+ }
+ },
+ {
+ "latitude": -27.14,
+ "longitude": 179.88,
+ "time": 1537776000.0,
+ "data": {
+ "depth": -99999.0,
+ "provider": "NCAR",
+ "project": "ICOADS Release 3.0",
+ "platform_code": "42",
+ "sea_water_temperature": 20.6,
+ "sea_water_temperature_quality": 1
+ }
+ }
+ ]
+ }
+ yield data_dict
diff --git a/analysis/webservice/algorithms/doms/insitu.py
b/analysis/webservice/algorithms/doms/insitu.py
new file mode 100644
index 0000000..0344839
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/insitu.py
@@ -0,0 +1,66 @@
+"""
+Module for querying CDMS In-Situ API
+"""
+import logging
+import requests
+from datetime import datetime
+from webservice.algorithms.doms import config as insitu_endpoints
+
+
+def query_insitu(dataset, variable, start_time, end_time, bbox, platform,
depth_min, depth_max,
+ items_per_page=1000, session=None):
+ """
+ Query insitu API, page through results, and aggregate
+ """
+ try:
+ start_time =
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ end_time =
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ provider = insitu_endpoints.get_provider_name(dataset)
+
+ params = {
+ 'itemsPerPage': items_per_page,
+ 'startTime': start_time,
+ 'endTime': end_time,
+ 'bbox': bbox,
+ 'minDepth': depth_min,
+ 'maxDepth': depth_max,
+ 'provider': provider,
+ 'project': dataset,
+ 'platform': platform,
+ }
+
+ if variable is not None:
+ params['variable'] = variable
+
+ insitu_response = {}
+
+ # Page through all insitu results
+ next_page_url = insitu_endpoints.getEndpoint()
+ while next_page_url is not None and next_page_url != 'NA':
+ logging.debug(f'Insitu request {next_page_url}')
+ if session is not None:
+ response = session.get(next_page_url, params=params)
+ else:
+ response = requests.get(next_page_url, params=params)
+
+ response.raise_for_status()
+ insitu_page_response = response.json()
+
+ if not insitu_response:
+ insitu_response = insitu_page_response
+ else:
+ insitu_response['results'].extend(insitu_page_response['results'])
+
+ next_page_url = insitu_page_response.get('next', None)
+ params = {} # Remove params, they are already included in above URL
+
+ return insitu_response
diff --git a/analysis/webservice/algorithms/doms/subsetter.py
b/analysis/webservice/algorithms/doms/subsetter.py
index 4c1ff97..12021b6 100644
--- a/analysis/webservice/algorithms/doms/subsetter.py
+++ b/analysis/webservice/algorithms/doms/subsetter.py
@@ -15,17 +15,18 @@
import logging
import os
-import tempfile
+import io
import zipfile
+from pytz import timezone
from datetime import datetime
-import requests
-
from . import BaseDomsHandler
from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import NexusProcessingException
+from webservice.webmodel import NexusProcessingException, NexusResults
+from webservice.algorithms.doms.insitu import query_insitu
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
def is_blank(my_string):
@@ -34,9 +35,9 @@ def is_blank(my_string):
@nexus_handler
class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS Subsetter"
- path = "/domssubset"
- description = "Subset DOMS sources given the search domain"
+ name = "CDMS Subsetter"
+ path = "/cdmssubset"
+ description = "Subset CDMS sources given the search domain"
params = {
"dataset": {
@@ -52,7 +53,7 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
"parameter": {
"name": "Data Parameter",
"type": "string",
- "description": "The parameter of interest. One of 'sst', 'sss',
'wind'. Required"
+ "description": "The insitu parameter of interest. Only required if
insitu is present."
},
"startTime": {
"name": "Start Time",
@@ -73,12 +74,12 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
"depthMin": {
"name": "Minimum Depth",
"type": "float",
- "description": "Minimum depth of measurements. Must be less than
depthMax. Optional"
+ "description": "Minimum depth of measurements. Must be less than
depthMax. Default 0. Optional"
},
"depthMax": {
"name": "Maximum Depth",
"type": "float",
- "description": "Maximum depth of measurements. Must be greater
than depthMin. Optional"
+ "description": "Maximum depth of measurements. Must be greater
than depthMin. Default 5. Optional"
},
"platforms": {
"name": "Platforms",
@@ -113,22 +114,24 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
matchup_ds_names = matchup_ds_names.split(',')
except:
raise NexusProcessingException(reason="'insitu' argument
should be a comma-seperated list", code=400)
+ else:
+ matchup_ds_names = []
parameter_s = request.get_argument('parameter', None)
- if parameter_s not in ['sst', 'sss', 'wind']:
+ if parameter_s is None and len(matchup_ds_names) > 0:
raise NexusProcessingException(
- reason="Parameter %s not supported. Must be one of 'sst',
'sss', 'wind'." % parameter_s, code=400)
+ reason="Parameter must be provided for insitu subset." %
parameter_s, code=400)
try:
start_time = request.get_start_datetime()
- start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ start_time = int((start_time - EPOCH).total_seconds())
except:
raise NexusProcessingException(
reason="'startTime' argument is required. Can be int value
seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
code=400)
try:
end_time = request.get_end_datetime()
- end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
+ end_time = int((end_time - EPOCH).total_seconds())
except:
raise NexusProcessingException(
reason="'endTime' argument is required. Can be int value
seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
@@ -147,8 +150,8 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
reason="'b' argument is required. Must be comma-delimited
float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude,
Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
code=400)
- depth_min = request.get_decimal_arg('depthMin', default=None)
- depth_max = request.get_decimal_arg('depthMax', default=None)
+ depth_min = request.get_decimal_arg('depthMin', default=0)
+ depth_max = request.get_decimal_arg('depthMax', default=5)
if depth_min is not None and depth_max is not None and depth_min >=
depth_max:
raise NexusProcessingException(
@@ -167,94 +170,143 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
bounding_polygon, depth_min, depth_max, platforms
def calc(self, request, **args):
-
primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
bounding_polygon, depth_min, depth_max, platforms =
self.parse_arguments(request)
- primary_url = "https://doms.jpl.nasa.gov/datainbounds"
- primary_params = {
- 'ds': primary_ds_name,
- 'parameter': parameter_s,
- 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
- 'startTime': start_time,
- 'endTime': end_time,
- 'output': "CSV"
- }
-
- matchup_url = "https://doms.jpl.nasa.gov/domsinsitusubset"
- matchup_params = {
- 'source': None,
- 'parameter': parameter_s,
- 'startTime': start_time,
- 'endTime': end_time,
- 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
- 'depthMin': depth_min,
- 'depthMax': depth_max,
- 'platforms': platforms,
- 'output': 'CSV'
- }
-
- primary_temp_file_path = None
- matchup_downloads = None
-
- with requests.session() as session:
-
- if not is_blank(primary_ds_name):
- # Download primary
- primary_temp_file, primary_temp_file_path =
tempfile.mkstemp(suffix='.csv')
- download_file(primary_url, primary_temp_file_path, session,
params=primary_params)
-
- if matchup_ds_names is not None and len(matchup_ds_names) > 0:
- # Download matchup
- matchup_downloads = {}
- for matchup_ds in matchup_ds_names:
- matchup_downloads[matchup_ds] =
tempfile.mkstemp(suffix='.csv')
- matchup_params['source'] = matchup_ds
- download_file(matchup_url,
matchup_downloads[matchup_ds][1], session, params=matchup_params)
-
- # Zip downloads
- date_range = "%s-%s" % (datetime.strptime(start_time,
"%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"),
- datetime.strptime(end_time,
"%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"))
- bounds = '%.4fW_%.4fS_%.4fE_%.4fN' % bounding_polygon.bounds
- zip_dir = tempfile.mkdtemp()
- zip_path = '%s/subset.%s.%s.zip' % (zip_dir, date_range, bounds)
- with zipfile.ZipFile(zip_path, 'w') as my_zip:
- if primary_temp_file_path:
- my_zip.write(primary_temp_file_path, arcname='%s.%s.%s.csv' %
(primary_ds_name, date_range, bounds))
- if matchup_downloads:
- for matchup_ds, download in matchup_downloads.items():
- my_zip.write(download[1], arcname='%s.%s.%s.csv' %
(matchup_ds, date_range, bounds))
-
- # Clean up
- if primary_temp_file_path:
- os.remove(primary_temp_file_path)
- if matchup_downloads:
- for matchup_ds, download in matchup_downloads.items():
- os.remove(download[1])
-
- return SubsetResult(zip_path)
-
-
-class SubsetResult(object):
- def __init__(self, zip_path):
- self.zip_path = zip_path
-
+ min_lat = max_lat = min_lon = max_lon = None
+ if bounding_polygon:
+ min_lat = bounding_polygon.bounds[1]
+ max_lat = bounding_polygon.bounds[3]
+ min_lon = bounding_polygon.bounds[0]
+ max_lon = bounding_polygon.bounds[2]
+
+ tiles = self._get_tile_service().get_tiles_bounded_by_box(
+ min_lat=min_lat, max_lat=max_lat, min_lon=min_lon,
+ max_lon=max_lon, ds=primary_ds_name, start_time=start_time,
+ end_time=end_time
+ )
+
+ # Satellite
+ data = []
+ data_dict = {}
+ for tile in tiles:
+ for nexus_point in tile.nexus_point_generator():
+ if tile.is_multi:
+ data_points = {
+ tile.variables[idx].standard_name:
nexus_point.data_vals[idx]
+ for idx in range(len(tile.variables))
+ }
+ else:
+ data_points = {tile.variables[0].standard_name:
nexus_point.data_vals}
+ data.append({
+ 'latitude': nexus_point.latitude,
+ 'longitude': nexus_point.longitude,
+ 'time': nexus_point.time,
+ 'data': data_points
+ })
+ data_dict[primary_ds_name] = data
+
+ # In-situ
+ non_data_fields = [
+ 'meta',
+ 'platform',
+ 'job_id',
+ 'latitude',
+ 'longitude',
+ 'time',
+ ]
+ for insitu_dataset in matchup_ds_names:
+ data = []
+ edge_response = query_insitu(
+ dataset=insitu_dataset,
+ variable=parameter_s,
+ start_time=start_time,
+ end_time=end_time,
+ bbox=','.join(list(map(str, [min_lon, min_lat, max_lon,
max_lat]))),
+ platform=platforms,
+ depth_min=depth_min,
+ depth_max=depth_max
+ )
+ for result in edge_response['results']:
+ data_points = {
+ key: value for key, value in result.items()
+ if value is not None and key not in non_data_fields
+ }
+ data.append({
+ 'latitude': result['latitude'],
+ 'longitude': result['longitude'],
+ 'time': (datetime.strptime(result['time'],
'%Y-%m-%dT%H:%M:%SZ') - datetime.fromtimestamp(0)).total_seconds(),
+ 'data': data_points
+ })
+ data_dict[insitu_dataset] = data
+
+ if len(tiles) > 0:
+ meta = [tile.get_summary() for tile in tiles]
+ else:
+ meta = None
+
+ result = SubsetResult(
+ results=data_dict,
+ meta=meta
+ )
+
+ result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time,
end_time)
+
+ return result
+
+
+class SubsetResult(NexusResults):
def toJson(self):
raise NotImplementedError
+ def toCsv(self):
+ """
+ Convert results to csv
+ """
+ dataset_results = self.results()
+ csv_results = {}
+
+ for dataset_name, results in dataset_results.items():
+ rows = []
+
+ headers = [
+ 'longitude',
+ 'latitude',
+ 'time'
+ ]
+ data_variables = list(set([keys for result in results for keys in
result['data'].keys()]))
+ data_variables.sort()
+ headers.extend(data_variables)
+ for i, result in enumerate(results):
+ cols = []
+
+ cols.append(result['longitude'])
+ cols.append(result['latitude'])
+
cols.append(datetime.utcfromtimestamp(result['time']).strftime('%Y-%m-%dT%H:%M:%SZ'))
+
+ for var in data_variables:
+ cols.append(result['data'][var])
+ if i == 0:
+ rows.append(','.join(headers))
+ rows.append(','.join(map(str, cols)))
+
+ csv_results[dataset_name] = '\r\n'.join(rows)
+ return csv_results
+
def toZip(self):
- with open(self.zip_path, 'rb') as zip_file:
- zip_contents = zip_file.read()
+ """
+ Convert csv results to zip. Each subsetted dataset becomes a csv
+ inside the zip named as <dataset-short-name>.csv
+ """
+ csv_results = self.toCsv()
+
+ buffer = io.BytesIO()
+ with zipfile.ZipFile(buffer, 'a', zipfile.ZIP_DEFLATED) as zip_file:
+ for dataset_name, csv_contents in csv_results.items():
+ zip_file.writestr(f'{dataset_name}.csv', csv_contents)
- return zip_contents
+ buffer.seek(0)
+ return buffer.read()
def cleanup(self):
os.remove(self.zip_path)
-
-
-def download_file(url, filepath, session, params=None):
- r = session.get(url, params=params, stream=True)
- with open(filepath, 'wb') as f:
- for chunk in r.iter_content(chunk_size=1024):
- if chunk: # filter out keep-alive new chunks
- f.write(chunk)
diff --git a/analysis/webservice/algorithms_spark/Matchup.py
b/analysis/webservice/algorithms_spark/Matchup.py
index 5ce97a8..671a935 100644
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -40,6 +40,7 @@ from webservice.algorithms.doms import config as
edge_endpoints
from webservice.algorithms.doms import values as doms_values
from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.algorithms.doms.insitu import query_insitu as query_edge
from webservice.webmodel import NexusProcessingException
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -816,60 +817,3 @@ def match_tile_to_point_generator(tile_service, tile_id,
m_tree, edge_results, s
for m_point_index in point_matches:
m_doms_point =
DomsPoint.from_edge_point(edge_results[m_point_index])
yield p_doms_point, m_doms_point
-
-
-def query_edge(dataset, variable, startTime, endTime, bbox, platform,
depth_min, depth_max, itemsPerPage=1000,
- startIndex=0, stats=True, session=None):
- try:
- startTime =
datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
- except TypeError:
- # Assume we were passed a properly formatted string
- pass
-
- try:
- endTime =
datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
- except TypeError:
- # Assume we were passed a properly formatted string
- pass
-
- provider = edge_endpoints.get_provider_name(dataset)
-
- params = {
- "itemsPerPage": itemsPerPage,
- "startTime": startTime,
- "endTime": endTime,
- "bbox": bbox,
- "minDepth": depth_min,
- "maxDepth": depth_max,
- "provider": provider,
- "project": dataset,
- "platform": platform,
- }
-
- if variable is not None:
- params["variable"] = variable
-
- edge_response = {}
-
- # Get all edge results
- next_page_url = edge_endpoints.getEndpoint()
- while next_page_url is not None and next_page_url != 'NA':
- logging.debug(f'Edge request {next_page_url}')
- if session is not None:
- edge_page_request = session.get(next_page_url, params=params)
- else:
- edge_page_request = requests.get(next_page_url, params=params)
-
- edge_page_request.raise_for_status()
-
- edge_page_response = json.loads(edge_page_request.text)
-
- if not edge_response:
- edge_response = edge_page_response
- else:
- edge_response['results'].extend(edge_page_response['results'])
-
- next_page_url = edge_page_response.get('next', None)
- params = {} # Remove params, they are already included in above URL
-
- return edge_response
diff --git a/analysis/webservice/apidocs/openapi.yml
b/analysis/webservice/apidocs/openapi.yml
index 2754463..bb20c1a 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -333,10 +333,10 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/Error'
- /domssubset:
+ /cdmssubset:
get:
- summary: Subset DOMS sources given the search domain
- operationId: domssubset
+ summary: Subset CDMS sources given the search domain
+ operationId: cdmssubset
tags:
- Subsetting
parameters:
diff --git a/data-access/nexustiles/model/nexusmodel.py
b/data-access/nexustiles/model/nexusmodel.py
index 753d264..f5c9df6 100644
--- a/data-access/nexustiles/model/nexusmodel.py
+++ b/data-access/nexustiles/model/nexusmodel.py
@@ -126,22 +126,30 @@ class Tile(object):
return summary
def nexus_point_generator(self, include_nan=False):
+ indices = self.get_indices(include_nan)
+
if include_nan:
- for index in np.ndindex(self.data.shape):
+ for index in indices:
time = self.times[index[0]]
lat = self.latitudes[index[1]]
lon = self.longitudes[index[2]]
- data_val = self.data[index]
- point = NexusPoint(lat, lon, None, time, index, data_val)
+ if self.is_multi:
+ data_vals = [data[index] for data in self.data]
+ else:
+ data_vals = self.data[index]
+ point = NexusPoint(lat, lon, None, time, index, data_vals)
yield point
else:
- for index in np.transpose(np.ma.nonzero(self.data)):
+ for index in indices:
index = tuple(index)
time = self.times[index[0]]
lat = self.latitudes[index[1]]
lon = self.longitudes[index[2]]
- data_val = self.data[index]
- point = NexusPoint(lat, lon, None, time, index, data_val)
+ if self.is_multi:
+ data_vals = [data[index] for data in self.data]
+ else:
+ data_vals = self.data[index]
+ point = NexusPoint(lat, lon, None, time, index, data_vals)
yield point
def get_indices(self, include_nan=False):
diff --git a/data-access/nexustiles/nexustiles.py
b/data-access/nexustiles/nexustiles.py
index 7483c2b..a3aa61e 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -423,7 +423,16 @@ class NexusTileService(object):
| ma.getmaskarray(tile.latitudes)[np.newaxis, :,
np.newaxis] \
| ma.getmaskarray(tile.longitudes)[np.newaxis,
np.newaxis, :]
- tile.data = ma.masked_where(data_mask, tile.data)
+ # If this is multi-var, need to mask each variable separately.
+ if tile.is_multi:
+ # Combine space/time mask with existing mask on data
+ data_mask = reduce(np.logical_or, [tile.data[0].mask,
data_mask])
+
+ num_vars = len(tile.data)
+ multi_data_mask = np.repeat(data_mask[np.newaxis, ...],
num_vars, axis=0)
+ tile.data = ma.masked_where(multi_data_mask, tile.data)
+ else:
+ tile.data = ma.masked_where(data_mask, tile.data)
tiles[:] = [tile for tile in tiles if not tile.data.mask.all()]