This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch SDAP-371
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git

commit e20466c7fe77aceebf2fda997cb3cb41b32c6753
Author: skorper <[email protected]>
AuthorDate: Thu Jul 7 16:18:26 2022 -0700

    Updated doms subsetter to support insitu subsetting
---
 analysis/tests/algorithms/test_subsetter.py      | 126 +++++++++++++++++++++++
 analysis/webservice/algorithms/doms/insitu.py    |  66 ++++++++++++
 analysis/webservice/algorithms/doms/subsetter.py | 103 +++++++++++-------
 analysis/webservice/apidocs/openapi.yml          |   6 +-
 4 files changed, 262 insertions(+), 39 deletions(-)

diff --git a/analysis/tests/algorithms/test_subsetter.py 
b/analysis/tests/algorithms/test_subsetter.py
new file mode 100644
index 0000000..af99348
--- /dev/null
+++ b/analysis/tests/algorithms/test_subsetter.py
@@ -0,0 +1,126 @@
+import shutil
+import tempfile
+from os import listdir
+from os.path import dirname, join, realpath, abspath, isfile
+
+import pytest
+from webservice.algorithms.doms.subsetter import SubsetResult
+
+
+def test_to_csv(input_data):
+    """
+    Test that csv output contains the expected contents
+    """
+    result = SubsetResult(
+        results=input_data,
+        meta=None
+    )
+
+    csv_result = result.toCsv()
+
+    expected_result_ascat = '''\
+longitude,latitude,time,wind_speed,wind_to_direction\r
+179.97830000000002,-27.6048,2018-09-24T09:10:03Z,0.32999998331069946,98.30000305175781\r
+179.9642,-27.49519,2018-09-24T09:10:05Z,0.699999988079071,179.5'''
+
+    expected_result_icoads = '''\
+longitude,latitude,time,depth,platform_code,project,provider,sea_water_temperature,sea_water_temperature_quality\r
+160.47,-28.38,2018-09-24T08:00:00Z,-99999.0,42,ICOADS Release 3.0,NCAR,18.6,1\r
+179.88,-27.14,2018-09-24T08:00:00Z,-99999.0,42,ICOADS Release 
3.0,NCAR,20.6,1'''
+
+    assert 'ASCATB-L2-Coastal' in csv_result
+    assert csv_result['ASCATB-L2-Coastal'] == expected_result_ascat
+
+    assert 'ICOADS Release 3.0' in csv_result
+    assert csv_result['ICOADS Release 3.0'] == expected_result_icoads
+
+
+def test_to_zip(input_data, temp_dir):
+    """
+    Test that zip output contains the expected contents
+    """
+    result = SubsetResult(
+        results=input_data,
+        meta=None
+    )
+
+    zip_result = result.toZip()
+    zip_location = join(temp_dir, 'response.zip')
+    unzip_location = join(temp_dir, 'zip_contents')
+
+    with open(zip_location, 'wb') as out_zip:
+        out_zip.write(zip_result)
+
+    shutil.unpack_archive(zip_location, unzip_location)
+
+    zip_contents = [
+        f for f in listdir(unzip_location) if isfile(join(unzip_location, f))
+    ]
+
+    assert len(zip_contents) == 2
+    assert 'ASCATB-L2-Coastal.csv' in zip_contents
+    assert 'ICOADS Release 3.0.csv' in zip_contents
+
+
[email protected]
+def temp_dir():
+    test_dir = dirname(realpath(__file__))
+    test_data_dir = abspath(join(test_dir, '..', 'data'))
+    temp_data_dir = tempfile.mkdtemp(dir=test_data_dir)
+    yield temp_data_dir
+    shutil.rmtree(temp_data_dir)
+
+
[email protected]
+def input_data():
+    data_dict = {
+        "ASCATB-L2-Coastal": [
+            {
+                "latitude": -27.6048,
+                "longitude": 179.97830000000002,
+                "time": 1537780203,
+                "data": {
+                    "wind_speed": 0.32999998331069946,
+                    "wind_to_direction": 98.30000305175781
+                }
+            },
+            {
+                "latitude": -27.49519,
+                "longitude": 179.9642,
+                "time": 1537780205,
+                "data": {
+                    "wind_speed": 0.699999988079071,
+                    "wind_to_direction": 179.5
+                }
+            }
+        ],
+        "ICOADS Release 3.0": [
+            {
+                "latitude": -28.38,
+                "longitude": 160.47,
+                "time": 1537776000.0,
+                "data": {
+                    "depth": -99999.0,
+                    "provider": "NCAR",
+                    "project": "ICOADS Release 3.0",
+                    "platform_code": "42",
+                    "sea_water_temperature": 18.6,
+                    "sea_water_temperature_quality": 1
+                }
+            },
+            {
+                "latitude": -27.14,
+                "longitude": 179.88,
+                "time": 1537776000.0,
+                "data": {
+                    "depth": -99999.0,
+                    "provider": "NCAR",
+                    "project": "ICOADS Release 3.0",
+                    "platform_code": "42",
+                    "sea_water_temperature": 20.6,
+                    "sea_water_temperature_quality": 1
+                }
+            }
+        ]
+    }
+    yield data_dict
diff --git a/analysis/webservice/algorithms/doms/insitu.py 
b/analysis/webservice/algorithms/doms/insitu.py
new file mode 100644
index 0000000..0344839
--- /dev/null
+++ b/analysis/webservice/algorithms/doms/insitu.py
@@ -0,0 +1,66 @@
+"""
+Module for querying CDMS In-Situ API
+"""
+import logging
+import requests
+from datetime import datetime
+from webservice.algorithms.doms import config as insitu_endpoints
+
+
+def query_insitu(dataset, variable, start_time, end_time, bbox, platform, 
depth_min, depth_max,
+               items_per_page=1000, session=None):
+    """
+    Query insitu API, page through results, and aggregate
+    """
+    try:
+        start_time = 
datetime.utcfromtimestamp(start_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    try:
+        end_time = 
datetime.utcfromtimestamp(end_time).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    provider = insitu_endpoints.get_provider_name(dataset)
+
+    params = {
+        'itemsPerPage': items_per_page,
+        'startTime': start_time,
+        'endTime': end_time,
+        'bbox': bbox,
+        'minDepth': depth_min,
+        'maxDepth': depth_max,
+        'provider': provider,
+        'project': dataset,
+        'platform': platform,
+    }
+
+    if variable is not None:
+        params['variable'] = variable
+
+    insitu_response = {}
+
+    # Page through all insitu results
+    next_page_url = insitu_endpoints.getEndpoint()
+    while next_page_url is not None and next_page_url != 'NA':
+        logging.debug(f'Insitu request {next_page_url}')
+        if session is not None:
+            response = session.get(next_page_url, params=params)
+        else:
+            response = requests.get(next_page_url, params=params)
+
+        response.raise_for_status()
+        insitu_page_response = response.json()
+
+        if not insitu_response:
+            insitu_response = insitu_page_response
+        else:
+            insitu_response['results'].extend(insitu_page_response['results'])
+
+        next_page_url = insitu_page_response.get('next', None)
+        params = {}  # Remove params, they are already included in above URL
+
+    return insitu_response
diff --git a/analysis/webservice/algorithms/doms/subsetter.py 
b/analysis/webservice/algorithms/doms/subsetter.py
index 824f113..12021b6 100644
--- a/analysis/webservice/algorithms/doms/subsetter.py
+++ b/analysis/webservice/algorithms/doms/subsetter.py
@@ -16,16 +16,14 @@
 import logging
 import os
 import io
-import tempfile
 import zipfile
 from pytz import timezone
 from datetime import datetime
 
-import requests
-
 from . import BaseDomsHandler
 from webservice.NexusHandler import nexus_handler
 from webservice.webmodel import NexusProcessingException, NexusResults
+from webservice.algorithms.doms.insitu import query_insitu
 
 ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -55,7 +53,7 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
         "parameter": {
             "name": "Data Parameter",
             "type": "string",
-            "description": "The parameter of interest. One of 'sst', 'sss', 
'wind'. Required"
+            "description": "The insitu parameter of interest. Only required if 
insitu is present."
         },
         "startTime": {
             "name": "Start Time",
@@ -76,12 +74,12 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
         "depthMin": {
             "name": "Minimum Depth",
             "type": "float",
-            "description": "Minimum depth of measurements. Must be less than 
depthMax. Optional"
+            "description": "Minimum depth of measurements. Must be less than 
depthMax. Default 0. Optional"
         },
         "depthMax": {
             "name": "Maximum Depth",
             "type": "float",
-            "description": "Maximum depth of measurements. Must be greater 
than depthMin. Optional"
+            "description": "Maximum depth of measurements. Must be greater 
than depthMin. Default 5. Optional"
         },
         "platforms": {
             "name": "Platforms",
@@ -116,11 +114,13 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
                 matchup_ds_names = matchup_ds_names.split(',')
             except:
                 raise NexusProcessingException(reason="'insitu' argument 
should be a comma-seperated list", code=400)
+        else:
+            matchup_ds_names = []
 
         parameter_s = request.get_argument('parameter', None)
-        if parameter_s not in ['sst', 'sss', 'wind']:
+        if parameter_s is None and len(matchup_ds_names) > 0:
             raise NexusProcessingException(
-                reason="Parameter %s not supported. Must be one of 'sst', 
'sss', 'wind'." % parameter_s, code=400)
+                reason="Parameter must be provided for insitu subset." % 
parameter_s, code=400)
 
         try:
             start_time = request.get_start_datetime()
@@ -150,8 +150,8 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
                 reason="'b' argument is required. Must be comma-delimited 
float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, 
Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
                 code=400)
 
-        depth_min = request.get_decimal_arg('depthMin', default=None)
-        depth_max = request.get_decimal_arg('depthMax', default=None)
+        depth_min = request.get_decimal_arg('depthMin', default=0)
+        depth_max = request.get_decimal_arg('depthMax', default=5)
 
         if depth_min is not None and depth_max is not None and depth_min >= 
depth_max:
             raise NexusProcessingException(
@@ -180,15 +180,15 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
             min_lon = bounding_polygon.bounds[0]
             max_lon = bounding_polygon.bounds[2]
 
-            tiles = self._get_tile_service().get_tiles_bounded_by_box(min_lat, 
max_lat, min_lon,
-                                                                      max_lon, 
primary_ds_name, start_time,
-                                                                      end_time)
-        else:
-            tiles = []  # todo
-            # tiles = 
self._get_tile_service().get_tiles_by_metadata(metadata_filter, ds, start_time,
-            #                                                        end_time)
+        tiles = self._get_tile_service().get_tiles_bounded_by_box(
+            min_lat=min_lat, max_lat=max_lat, min_lon=min_lon,
+            max_lon=max_lon, ds=primary_ds_name, start_time=start_time,
+            end_time=end_time
+        )
 
+        # Satellite
         data = []
+        data_dict = {}
         for tile in tiles:
             for nexus_point in tile.nexus_point_generator():
                 if tile.is_multi:
@@ -204,7 +204,42 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
                     'time': nexus_point.time,
                     'data': data_points
                 })
-        data_dict = {primary_ds_name: data}
+        data_dict[primary_ds_name] = data
+
+        # In-situ
+        non_data_fields = [
+            'meta',
+            'platform',
+            'job_id',
+            'latitude',
+            'longitude',
+            'time',
+        ]
+        for insitu_dataset in matchup_ds_names:
+            data = []
+            edge_response = query_insitu(
+                dataset=insitu_dataset,
+                variable=parameter_s,
+                start_time=start_time,
+                end_time=end_time,
+                bbox=','.join(list(map(str, [min_lon, min_lat, max_lon, 
max_lat]))),
+                platform=platforms,
+                depth_min=depth_min,
+                depth_max=depth_max
+            )
+            for result in edge_response['results']:
+                data_points = {
+                    key: value for key, value in result.items()
+                    if value is not None and key not in non_data_fields
+                }
+                data.append({
+                    'latitude': result['latitude'],
+                    'longitude': result['longitude'],
+                    'time': (datetime.strptime(result['time'], 
'%Y-%m-%dT%H:%M:%SZ') - datetime.fromtimestamp(0)).total_seconds(),
+                    'data': data_points
+                })
+            data_dict[insitu_dataset] = data
+
         if len(tiles) > 0:
             meta = [tile.get_summary() for tile in tiles]
         else:
@@ -226,21 +261,21 @@ class SubsetResult(NexusResults):
 
     def toCsv(self):
         """
-        Convert results to CSV
+        Convert results to csv
         """
-        rows = []
-
-        headers = [
-            'longitude',
-            'latitude',
-            'time'
-        ]
-
         dataset_results = self.results()
         csv_results = {}
 
         for dataset_name, results in dataset_results.items():
-            data_variables = set([keys for result in results for keys in 
result['data'].keys()])
+            rows = []
+
+            headers = [
+                'longitude',
+                'latitude',
+                'time'
+            ]
+            data_variables = list(set([keys for result in results for keys in 
result['data'].keys()]))
+            data_variables.sort()
             headers.extend(data_variables)
             for i, result in enumerate(results):
                 cols = []
@@ -259,6 +294,10 @@ class SubsetResult(NexusResults):
         return csv_results
 
     def toZip(self):
+        """
+        Convert csv results to zip. Each subsetted dataset becomes a csv
+        inside the zip named as <dataset-short-name>.csv
+        """
         csv_results = self.toCsv()
 
         buffer = io.BytesIO()
@@ -271,11 +310,3 @@ class SubsetResult(NexusResults):
 
     def cleanup(self):
         os.remove(self.zip_path)
-
-
-def download_file(url, filepath, session, params=None):
-    r = session.get(url, params=params, stream=True)
-    with open(filepath, 'wb') as f:
-        for chunk in r.iter_content(chunk_size=1024):
-            if chunk:  # filter out keep-alive new chunks
-                f.write(chunk)
diff --git a/analysis/webservice/apidocs/openapi.yml 
b/analysis/webservice/apidocs/openapi.yml
index 2754463..bb20c1a 100644
--- a/analysis/webservice/apidocs/openapi.yml
+++ b/analysis/webservice/apidocs/openapi.yml
@@ -333,10 +333,10 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/Error'
-  /domssubset:
+  /cdmssubset:
     get:
-      summary: Subset DOMS sources given the search domain
-      operationId: domssubset
+      summary: Subset CDMS sources given the search domain
+      operationId: cdmssubset
       tags:
         - Subsetting
       parameters:

Reply via email to