This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch SDAP-505 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 3c9aecad8bb18c77404828e2aee600932eba146f Author: skorper <[email protected]> AuthorDate: Fri Dec 22 12:44:23 2023 -0800 insitu DOMS+Parquet support. Pydantic for model normalization --- analysis/tests/algorithms_spark/test_insitu.py | 80 ++++++++++++++ analysis/tests/data/edge_insitu_response.json | 42 ++++++++ analysis/tests/data/parquet_insitu_response.json | 73 +++++++++++++ analysis/webservice/algorithms/doms/config.py | 4 +- analysis/webservice/algorithms/doms/insitu.py | 132 +++++++++++++++++------ 5 files changed, 298 insertions(+), 33 deletions(-) diff --git a/analysis/tests/algorithms_spark/test_insitu.py b/analysis/tests/algorithms_spark/test_insitu.py new file mode 100644 index 0000000..b879945 --- /dev/null +++ b/analysis/tests/algorithms_spark/test_insitu.py @@ -0,0 +1,80 @@ +from webservice.algorithms.doms import insitu +import json + + +""" +From Nga: + +https://doms.coaps.fsu.edu/ws/search/samos_cdms?startTime=2018-01-14T17%3A00%3A00Z&endTime=2018-01-14T17%3A53%3A00Z&bbox=-89%2C29%2C-88%2C30&itemsPerPage=1 + +https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida%20State%20University%2C%20COAPS&project=SAMOS&startTime=2018-01-14T17%3A00%3A00Z&endTime=2018-01-14T17%3A53%3A00Z&bbox=-89%2C29%2C-88%2C30&minDepth=-1000&maxDepth=1000 +""" + + +def test_query_insitu(): + # https://doms.jpl.nasa.gov/insitu/1.0/query_data_doms_custom_pagination? + # itemsPerPage=20000 + # startTime=2017-05-01T01:54:45Z + # endTime=2017-05-01T16:41:01Z + # bbox=-100.0,20.0,-79.0,30.0 + # minDepth=-20.0 + # maxDepth=10.0 + # provider=Florida+State+University,+COAPS + # project=SAMOS + # platform=30 + response = insitu.query_insitu( + dataset='SAMOS', + variable=None, + start_time='2017-05-01T01:54:45Z', + end_time='2017-05-01T16:41:01Z', + bbox='-100.0,20.0,-79.0,30.0', + platform=30, + depth_min=-20.0, + depth_max=10.0, + items_per_page=1000 + ) + + print(f'{response=}') + # assert False + +def test_query_insitu_doms(): + # http://doms.coaps.fsu.edu/ws/search/samos? + # startTime=2012-08-01T00:00:00Z& + # endTime=2013-10-31T23:59:59Z& + # bbox=-45,15,-30,30 + response = insitu.query_insitu( + dataset='SAMOS', + variable=None, + start_time='2017-05-01T01:54:45Z', + end_time='2017-05-01T16:41:01Z', + bbox='-100.0,20.0,-79.0,30.0', + platform=None, # 30 seems to reduce the results to 0 + depth_min=-20.0, + depth_max=10.0, + items_per_page=1000 + ) + + print(f'{response=}') + + +def test_insitu_result_serialization(): + with open('../data/edge_insitu_response.json') as fp: + edge_result = json.loads(fp.read()) + + with open('../data/parquet_insitu_response.json') as fp: + parquet_result = json.loads(fp.read()) + + parquet_result_model = insitu.InsituResult(**parquet_result) + edge_result_model = insitu.InsituResult(**edge_result) + + assert parquet_result_model.results[0].platform == 30 + assert edge_result_model.results[0].platform is None + + assert parquet_result_model.results[0].metadata == 'WTEO_20180114v30001_0048-99999.0' + assert edge_result_model.results[0].metadata == 'WTEO_20180114v30001_0048-99999.0' + + assert parquet_result_model.total_results == 4 + assert edge_result_model.total_results == 4 + + + diff --git a/analysis/tests/data/edge_insitu_response.json b/analysis/tests/data/edge_insitu_response.json new file mode 100644 index 0000000..7537d78 --- /dev/null +++ b/analysis/tests/data/edge_insitu_response.json @@ -0,0 +1,42 @@ +{ + "last": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=4&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z", + "next": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=1&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z", + "first": "http://doms.coaps.fsu.edu/ws/search/samos_cdms?startIndex=0&endTime=2018-01-14T17%3A53%3A00Z&itemsPerPage=1&bbox=-89%2C29%2C-88%2C30&startTime=2018-01-14T17%3A00%3A00Z", + "results": [ + { + "time": "2018-01-14T17:53:00Z", + "latitude": 30, + "longitude": -88.43, + "depth": -99999, + "air_pressure": null, + "air_pressure_quality": null, + "air_temperature": 4.81, + "air_temperature_quality": 1, + "dew_point_temperature": null, + "dew_point_temperature_quality": null, + "downwelling_shortwave_flux_in_air": null, + "downwelling_shortwave_flux_in_air_quality": null, + "downwelling_longwave_flux_in_air": null, + "sea_water_temperature": 17.3, + "sea_water_temperature_quality": 1, + "eastward_wind": null, + "northward_wind": null, + "relative_humidity": null, + "relative_humidity_quality": null, + "sea_water_salinity": 36.02, + "sea_water_salinity_quality": 1, + "surface_downwelling_photosynthetic_photon_flux_in_air": null, + "surface_downwelling_photosynthetic_photon_flux_in_air_quality": null, + "wind_speed": null, + "wind_speed_quality": null, + "wind_component_quality": null, + "platform": null, + "device": null, + "mission": null, + "metadata": "WTEO_20180114v30001_0048-99999.0" + } + ], + "totalResults": 4, + "startIndex": 0, + "itemsPerPage": 1 +} \ No newline at end of file diff --git a/analysis/tests/data/parquet_insitu_response.json b/analysis/tests/data/parquet_insitu_response.json new file mode 100644 index 0000000..0979db7 --- /dev/null +++ b/analysis/tests/data/parquet_insitu_response.json @@ -0,0 +1,73 @@ +{ + "total": 4, + "results": [ + { + "time": "2018-01-14T17:53:00Z", + "latitude": 30, + "longitude": -88.43, + "depth": -99999, + "air_pressure": null, + "air_pressure_quality": null, + "air_temperature": 4.81, + "air_temperature_quality": 1, + "dew_point_temperature": null, + "dew_point_temperature_quality": null, + "downwelling_longwave_flux_in_air": null, + "downwelling_longwave_flux_in_air_quality": null, + "downwelling_longwave_radiance_in_air": null, + "downwelling_longwave_radiance_in_air_quality": null, + "downwelling_shortwave_flux_in_air": null, + "downwelling_shortwave_flux_in_air_quality": null, + "mass_concentration_of_chlorophyll_in_sea_water": null, + "mass_concentration_of_chlorophyll_in_sea_water_quality": null, + "rainfall_rate": null, + "rainfall_rate_quality": null, + "relative_humidity": null, + "relative_humidity_quality": null, + "sea_surface_salinity": null, + "sea_surface_salinity_quality": null, + "sea_surface_skin_temperature": null, + "sea_surface_skin_temperature_quality": null, + "sea_surface_subskin_temperature": null, + "sea_surface_subskin_temperature_quality": null, + "sea_surface_temperature": null, + "sea_surface_temperature_quality": null, + "sea_water_density": null, + "sea_water_density_quality": null, + "sea_water_electrical_conductivity": null, + "sea_water_electrical_conductivity_quality": null, + "sea_water_practical_salinity": null, + "sea_water_practical_salinity_quality": null, + "sea_water_salinity": 36.02, + "sea_water_salinity_quality": 1, + "sea_water_temperature": 17.3, + "sea_water_temperature_quality": 1, + "surface_downwelling_photosynthetic_photon_flux_in_air": null, + "surface_downwelling_photosynthetic_photon_flux_in_air_quality": null, + "wet_bulb_temperature": null, + "wet_bulb_temperature_quality": null, + "wind_speed": null, + "wind_speed_quality": null, + "wind_from_direction": null, + "wind_to_direction": null, + "wind_from_direction_quality": null, + "wind_to_direction_quality": null, + "eastward_wind": null, + "northward_wind": null, + "wind_component_quality": null, + "platform": { + "code": "30" + }, + "device": null, + "meta": "WTEO_20180114v30001_0048-99999.0", + "provider": "Florida State University, COAPS", + "project": "SAMOS", + "platform_code": "30", + "job_id": "47a61aac-6bc2-41a2-9568-920715c65b66" + } + ], + "last": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=3&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000", + "first": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000", + "next": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=1&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000", + "prev": "http://doms.jpl.nasa.gov/insitu/1.0/query_data_doms?startIndex=0&itemsPerPage=1&provider=Florida State University, COAPS&project=SAMOS&startTime=2018-01-14T17:00:00Z&endTime=2018-01-14T17:53:00Z&bbox=-89,29,-88,30&minDepth=-1000&maxDepth=1000" +} \ No newline at end of file diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py index 8ccc748..f16c1ed 100644 --- a/analysis/webservice/algorithms/doms/config.py +++ b/analysis/webservice/algorithms/doms/config.py @@ -103,7 +103,7 @@ INSITU_PROVIDER_MAP = [ ENDPOINTS = [ { "name": "samos", - "url": "https://doms.coaps.fsu.edu/ws/search/samos_cdms", + "url": "https://doms.coaps.fsu.edu/ws/search/samos", "fetchParallel": True, "fetchThreads": 8, "itemsPerPage": 1000, @@ -150,7 +150,7 @@ try: ENDPOINTS = [ { "name": "samos", - "url": "https://doms.coaps.fsu.edu/ws/search/samos_cdms", + "url": "https://doms.coaps.fsu.edu/ws/search/samos", "fetchParallel": True, "fetchThreads": 8, "itemsPerPage": 1000, diff --git a/analysis/webservice/algorithms/doms/insitu.py b/analysis/webservice/algorithms/doms/insitu.py index ae35b4a..55cbe1c 100644 --- a/analysis/webservice/algorithms/doms/insitu.py +++ b/analysis/webservice/algorithms/doms/insitu.py @@ -20,6 +20,99 @@ import logging import requests from datetime import datetime from webservice.algorithms.doms import config as insitu_endpoints +from pydantic import BaseModel, Field, AliasChoices, ConfigDict, field_validator +from typing import List, Optional, Dict, Union + + +class InsituRecord(BaseModel): + time: str + latitude: float + longitude: float + depth: float + platform: Optional[int] = None + device: Optional[float] + mission: Optional[str] = None + metadata: str = Field(validation_alias=AliasChoices('meta', 'metadata')) + provider: Optional[str] = None + project: Optional[str] = None + platform_code: Optional[str] = None + job_id: Optional[str] = None + + model_config = ConfigDict( + extra='allow', + ) + + @field_validator('platform', mode='before') + @classmethod + def transform(cls, raw_platform: Union[int, Dict[str, str]]) -> Optional[int]: + if isinstance(raw_platform, dict): + return raw_platform.get('code') + return raw_platform + + def get_variables(self): + return self.model_extra + + +class InsituResult(BaseModel): + last: str = str + prev: str = str + next: str = str + first: str = str + total_results: int = Field(validation_alias=AliasChoices('total', 'totalResults')) + results: List[InsituRecord] + + +def query_insitu_edge(dataset, params, session=None, stats=True): + params.update({ + 'stats': str(stats).lower(), + }) + + return query_insitu_api(session, insitu_endpoints.getEndpointByName(dataset)['url'], params) # TODO convert to shared entity + + +def query_insitu_parquet(dataset, params, session=None, **kwargs): + provider = insitu_endpoints.get_provider_name(dataset) + project = insitu_endpoints.get_project_name(dataset) + + params.update({ + 'provider': provider, + 'project': project + }) + + return query_insitu_api(session, insitu_endpoints.getEndpoint(provider, dataset), params) # TODO convert to shared entity + + +def query_insitu_api(session, endpoint, params): + insitu_response = {} + + # Page through all insitu results + next_page_url = endpoint + while next_page_url is not None and next_page_url != 'NA': + if session is not None: + response = session.get(next_page_url, params=params) + else: + response = requests.get(next_page_url, params=params) + + logging.info(f'Insitu request {response.url}') + print(f'Insitu request {response.url}') + + response.raise_for_status() + insitu_page_response = response.json() + + if not insitu_response: + insitu_response = insitu_page_response + else: + insitu_response['results'].extend(insitu_page_response['results']) + + next_page_url = insitu_page_response.get('next', None) + params = {} # Remove params, they are already included in above URL + + return insitu_response + + +def get_query_insitu_func(): + # return query_insitu_parquet # TODO upgrade this logic -- how to determine which API to query? Could add a new field to config + return query_insitu_edge # TODO upgrade this logic -- how to determine which API to query? Could add a new field to config def query_insitu_schema(): @@ -35,8 +128,7 @@ def query_insitu_schema(): return response.json() -def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max, - items_per_page=20000, session=None): +def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_min, depth_max, items_per_page=20000, session=None): """ Query insitu API, page through results, and aggregate """ @@ -52,45 +144,23 @@ def query_insitu(dataset, variable, start_time, end_time, bbox, platform, depth_ # Assume we were passed a properly formatted string pass - provider = insitu_endpoints.get_provider_name(dataset) - project = insitu_endpoints.get_project_name(dataset) - params = { - 'itemsPerPage': items_per_page, 'startTime': start_time, 'endTime': end_time, 'bbox': bbox, 'minDepth': depth_min, 'maxDepth': depth_max, - 'provider': provider, - 'project': project, 'platform': platform, + 'itemsPerPage': items_per_page } if variable is not None: params['variable'] = variable - insitu_response = {} + query_insitu_func = get_query_insitu_func() - # Page through all insitu results - next_page_url = insitu_endpoints.getEndpoint(provider, dataset) - while next_page_url is not None and next_page_url != 'NA': - if session is not None: - response = session.get(next_page_url, params=params) - else: - response = requests.get(next_page_url, params=params) - - logging.info(f'Insitu request {response.url}') - - response.raise_for_status() - insitu_page_response = response.json() - - if not insitu_response: - insitu_response = insitu_page_response - else: - insitu_response['results'].extend(insitu_page_response['results']) - - next_page_url = insitu_page_response.get('next', None) - params = {} # Remove params, they are already included in above URL - - return insitu_response + return query_insitu_func( + dataset=dataset, + params=params, + session=session + )
