This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch SDAP-371 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit e20466c7fe77aceebf2fda997cb3cb41b32c6753 Author: skorper <[email protected]> AuthorDate: Thu Jul 7 16:18:26 2022 -0700 Updated doms subsetter to support insitu subsetting --- analysis/tests/algorithms/test_subsetter.py | 126 +++++++++++++++++++++++ analysis/webservice/algorithms/doms/insitu.py | 66 ++++++++++++ analysis/webservice/algorithms/doms/subsetter.py | 103 +++++++++++------- analysis/webservice/apidocs/openapi.yml | 6 +- 4 files changed, 262 insertions(+), 39 deletions(-) diff --git a/analysis/tests/algorithms/test_subsetter.py b/analysis/tests/algorithms/test_subsetter.py new file mode 100644 index 0000000..af99348 --- /dev/null +++ b/analysis/tests/algorithms/test_subsetter.py @@ -0,0 +1,126 @@ +import shutil +import tempfile +from os import listdir +from os.path import dirname, join, realpath, abspath, isfile + +import pytest +from webservice.algorithms.doms.subsetter import SubsetResult + + +def test_to_csv(input_data): + """ + Test that csv output contains the expected contents + """ + result = SubsetResult( + results=input_data, + meta=None + ) + + csv_result = result.toCsv() + + expected_result_ascat = '''\ +longitude,latitude,time,wind_speed,wind_to_direction\r +179.97830000000002,-27.6048,2018-09-24T09:10:03Z,0.32999998331069946,98.30000305175781\r +179.9642,-27.49519,2018-09-24T09:10:05Z,0.699999988079071,179.5''' + + expected_result_icoads = '''\ +longitude,latitude,time,depth,platform_code,project,provider,sea_water_temperature,sea_water_temperature_quality\r +160.47,-28.38,2018-09-24T08:00:00Z,-99999.0,42,ICOADS Release 3.0,NCAR,18.6,1\r +179.88,-27.14,2018-09-24T08:00:00Z,-99999.0,42,ICOADS Release 3.0,NCAR,20.6,1''' + + assert 'ASCATB-L2-Coastal' in csv_result + assert csv_result['ASCATB-L2-Coastal'] == expected_result_ascat + + assert 'ICOADS Release 3.0' in csv_result + assert csv_result['ICOADS Release 3.0'] == expected_result_icoads + + +def test_to_zip(input_data, temp_dir): + """ + Test that zip output contains the expected contents + """ + result = SubsetResult( + results=input_data, + meta=None + ) + + zip_result = result.toZip() + zip_location = join(temp_dir, 'response.zip') + unzip_location = join(temp_dir, 'zip_contents') + + with open(zip_location, 'wb') as out_zip: + out_zip.write(zip_result) + + shutil.unpack_archive(zip_location, unzip_location) + + zip_contents = [ + f for f in listdir(unzip_location) if isfile(join(unzip_location, f)) + ] + + assert len(zip_contents) == 2 + assert 'ASCATB-L2-Coastal.csv' in zip_contents + assert 'ICOADS Release 3.0.csv' in zip_contents + + [email protected] +def temp_dir(): + test_dir = dirname(realpath(__file__)) + test_data_dir = abspath(join(test_dir, '..', 'data')) + temp_data_dir = tempfile.mkdtemp(dir=test_data_dir) + yield temp_data_dir + shutil.rmtree(temp_data_dir) + + [email protected] +def input_data(): + data_dict = { + "ASCATB-L2-Coastal": [ + { + "latitude": -27.6048, + "longitude": 179.97830000000002, + "time": 1537780203, + "data": { + "wind_speed": 0.32999998331069946, + "wind_to_direction": 98.30000305175781 + } + }, + { + "latitude": -27.49519, + "longitude": 179.9642, + "time": 1537780205, + "data": { + "wind_speed": 0.699999988079071, + "wind_to_direction": 179.5 + } + } + ], + "ICOADS Release 3.0": [ + { + "latitude": -28.38, + "longitude": 160.47, + "time": 1537776000.0, + "data": { + "depth": -99999.0, + "provider": "NCAR", + "project": "ICOADS Release 3.0", + "platform_code": "42", + "sea_water_temperature": 18.6, + "sea_water_temperature_quality": 1 + } + }, + { + "latitude": -27.14, + "longitude": 179.88, + "time": 1537776000.0, + "data": { + "depth": -99999.0, + "provider": "NCAR", + "project": "ICOADS Release 3.0", + "platform_code": "42", + "sea_water_temperature": 20.6, + "sea_water_temperature_quality": 1 + } + } + ] + } + yield data_dict diff --git a/analysis/webservice/algorithms/doms/insitu.py b/analysis/webservice/algorithms/doms/insitu.py new file mode 100644 index 0000000..0344839 --- /dev/null +++ b/analysis/webservice/algorithms/doms/insitu.py @@ -0,0 +1,66 @@ +""" +Module for querying CDMS In-Situ API +""" +import logging +import requests +from datetime import datetime +from webservice.algorithms.doms import config as insitu_endpoints + + +def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max, + items_per_page=1000, session=None): + """ + Query insitu API, page through results, and aggregate + """ + try: + start_time = datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + end_time = datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + provider = insitu_endpoints.get_provider_name(dataset) + + params = { + 'itemsPerPage': items_per_page, + 'startTime': start_time, + 'endTime': end_time, + 'bbox': bbox, + 'minDepth': depth_min, + 'maxDepth': depth_max, + 'provider': provider, + 'project': dataset, + 'platform': platform, + } + + if variable is not None: + params['variable'] = variable + + insitu_response = {} + + # Page through all insitu results + next_page_url = insitu_endpoints.getEndpoint() + while next_page_url is not None and next_page_url != 'NA': + logging.debug(f'Insitu request {next_page_url}') + if session is not None: + response = session.get(next_page_url, params=params) + else: + response = requests.get(next_page_url, params=params) + + response.raise_for_status() + insitu_page_response = response.json() + + if not insitu_response: + insitu_response = insitu_page_response + else: + insitu_response['results'].extend(insitu_page_response['results']) + + next_page_url = insitu_page_response.get('next', None) + params = {} # Remove params, they are already included in above URL + + return insitu_response diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py index 824f113..12021b6 100644 --- a/analysis/webservice/algorithms/doms/subsetter.py +++ b/analysis/webservice/algorithms/doms/subsetter.py @@ -16,16 +16,14 @@ import logging import os import io -import tempfile import zipfile from pytz import timezone from datetime import datetime -import requests - from . import BaseDomsHandler from webservice.NexusHandler import nexus_handler from webservice.webmodel import NexusProcessingException, NexusResults +from webservice.algorithms.doms.insitu import query_insitu ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) @@ -55,7 +53,7 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): "parameter": { "name": "Data Parameter", "type": "string", - "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required" + "description": "The insitu parameter of interest. Only required if insitu is present." }, "startTime": { "name": "Start Time", @@ -76,12 +74,12 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): "depthMin": { "name": "Minimum Depth", "type": "float", - "description": "Minimum depth of measurements. Must be less than depthMax. Optional" + "description": "Minimum depth of measurements. Must be less than depthMax. Default 0. Optional" }, "depthMax": { "name": "Maximum Depth", "type": "float", - "description": "Maximum depth of measurements. Must be greater than depthMin. Optional" + "description": "Maximum depth of measurements. Must be greater than depthMin. Default 5. Optional" }, "platforms": { "name": "Platforms", @@ -116,11 +114,13 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): matchup_ds_names = matchup_ds_names.split(',') except: raise NexusProcessingException(reason="'insitu' argument should be a comma-seperated list", code=400) + else: + matchup_ds_names = [] parameter_s = request.get_argument('parameter', None) - if parameter_s not in ['sst', 'sss', 'wind']: + if parameter_s is None and len(matchup_ds_names) > 0: raise NexusProcessingException( - reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) + reason="Parameter must be provided for insitu subset." % parameter_s, code=400) try: start_time = request.get_start_datetime() @@ -150,8 +150,8 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", code=400) - depth_min = request.get_decimal_arg('depthMin', default=None) - depth_max = request.get_decimal_arg('depthMax', default=None) + depth_min = request.get_decimal_arg('depthMin', default=0) + depth_max = request.get_decimal_arg('depthMax', default=5) if depth_min is not None and depth_max is not None and depth_min >= depth_max: raise NexusProcessingException( @@ -180,15 +180,15 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): min_lon = bounding_polygon.bounds[0] max_lon = bounding_polygon.bounds[2] - tiles = self._get_tile_service().get_tiles_bounded_by_box(min_lat, max_lat, min_lon, - max_lon, primary_ds_name, start_time, - end_time) - else: - tiles = [] # todo - # tiles = self._get_tile_service().get_tiles_by_metadata(metadata_filter, ds, start_time, - # end_time) + tiles = self._get_tile_service().get_tiles_bounded_by_box( + min_lat=min_lat, max_lat=max_lat, min_lon=min_lon, + max_lon=max_lon, ds=primary_ds_name, start_time=start_time, + end_time=end_time + ) + # Satellite data = [] + data_dict = {} for tile in tiles: for nexus_point in tile.nexus_point_generator(): if tile.is_multi: @@ -204,7 +204,42 @@ class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): 'time': nexus_point.time, 'data': data_points }) - data_dict = {primary_ds_name: data} + data_dict[primary_ds_name] = data + + # In-situ + non_data_fields = [ + 'meta', + 'platform', + 'job_id', + 'latitude', + 'longitude', + 'time', + ] + for insitu_dataset in matchup_ds_names: + data = [] + edge_response = query_insitu( + dataset=insitu_dataset, + variable=parameter_s, + start_time=start_time, + end_time=end_time, + bbox=','.join(list(map(str, [min_lon, min_lat, max_lon, max_lat]))), + platform=platforms, + depth_min=depth_min, + depth_max=depth_max + ) + for result in edge_response['results']: + data_points = { + key: value for key, value in result.items() + if value is not None and key not in non_data_fields + } + data.append({ + 'latitude': result['latitude'], + 'longitude': result['longitude'], + 'time': (datetime.strptime(result['time'], '%Y-%m-%dT%H:%M:%SZ') - datetime.fromtimestamp(0)).total_seconds(), + 'data': data_points + }) + data_dict[insitu_dataset] = data + if len(tiles) > 0: meta = [tile.get_summary() for tile in tiles] else: @@ -226,21 +261,21 @@ class SubsetResult(NexusResults): def toCsv(self): """ - Convert results to CSV + Convert results to csv """ - rows = [] - - headers = [ - 'longitude', - 'latitude', - 'time' - ] - dataset_results = self.results() csv_results = {} for dataset_name, results in dataset_results.items(): - data_variables = set([keys for result in results for keys in result['data'].keys()]) + rows = [] + + headers = [ + 'longitude', + 'latitude', + 'time' + ] + data_variables = list(set([keys for result in results for keys in result['data'].keys()])) + data_variables.sort() headers.extend(data_variables) for i, result in enumerate(results): cols = [] @@ -259,6 +294,10 @@ class SubsetResult(NexusResults): return csv_results def toZip(self): + """ + Convert csv results to zip. Each subsetted dataset becomes a csv + inside the zip named as <dataset-short-name>.csv + """ csv_results = self.toCsv() buffer = io.BytesIO() @@ -271,11 +310,3 @@ class SubsetResult(NexusResults): def cleanup(self): os.remove(self.zip_path) - - -def download_file(url, filepath, session, params=None): - r = session.get(url, params=params, stream=True) - with open(filepath, 'wb') as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: # filter out keep-alive new chunks - f.write(chunk) diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml index 2754463..bb20c1a 100644 --- a/analysis/webservice/apidocs/openapi.yml +++ b/analysis/webservice/apidocs/openapi.yml @@ -333,10 +333,10 @@ paths: application/json: schema: $ref: '#/components/schemas/Error' - /domssubset: + /cdmssubset: get: - summary: Subset DOMS sources given the search domain - operationId: domssubset + summary: Subset CDMS sources given the search domain + operationId: cdmssubset tags: - Subsetting parameters:
