This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch SDAP-506 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 32ca3d709237d324decada84fe92a4a4044b6521 Author: skorper <[email protected]> AuthorDate: Fri Jan 5 15:33:49 2024 -0800 stac catalog --- .../webservice/algorithms/doms/ResultsStorage.py | 15 +- analysis/webservice/algorithms/doms/StacCatalog.py | 166 +++++++++++++++++++++ analysis/webservice/algorithms/doms/__init__.py | 1 + .../webservice/webmodel/NexusExecutionResults.py | 6 + 4 files changed, 180 insertions(+), 8 deletions(-) diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py index 39db27b..6b4cc1c 100644 --- a/analysis/webservice/algorithms/doms/ResultsStorage.py +++ b/analysis/webservice/algorithms/doms/ResultsStorage.py @@ -286,7 +286,7 @@ class ResultsRetrieval(AbstractResultsContainer): execution_id = uuid.UUID(execution_id) params = self.retrieveParams(execution_id) - stats = self.__retrieveStats(execution_id) + stats = self.retrieveStats(execution_id) data = self.__retrieveData(execution_id, trim_data=trim_data, page_num=page_num, page_size=page_size) return params, stats, data @@ -357,19 +357,18 @@ class ResultsRetrieval(AbstractResultsContainer): return entry - def __retrieveStats(self, id): - cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete FROM doms_execution_stats where execution_id = %s limit 1" + def retrieveStats(self, id): + cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete, num_unique_secondaries FROM doms_execution_stats where execution_id = %s limit 1" rows = self._session.execute(cql, (id,)) for row in rows: stats = { - "timeToComplete": row.time_to_complete, - "numSecondaryMatched": row.num_insitu_matched, - "numPrimaryMatched": row.num_gridded_matched, + 'timeToComplete': row.time_to_complete, + 'numSecondaryMatched': row.num_insitu_matched, + 'numPrimaryMatched': row.num_gridded_matched, + 'numUniqueSecondaries': row.num_unique_secondaries } return stats - raise NexusProcessingException(reason=f'No stats found for id {str(id)}', code=404) - def retrieveParams(self, id): cql = "SELECT * FROM doms_params where execution_id = %s limit 1" rows = self._session.execute(cql, (id,)) diff --git a/analysis/webservice/algorithms/doms/StacCatalog.py b/analysis/webservice/algorithms/doms/StacCatalog.py new file mode 100644 index 0000000..2c1aa12 --- /dev/null +++ b/analysis/webservice/algorithms/doms/StacCatalog.py @@ -0,0 +1,166 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re +import uuid +from typing import List + +from webservice.NexusHandler import nexus_handler +from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval +from webservice.webmodel import NexusProcessingException +from webservice.webmodel import NexusResults + +from . import BaseDomsHandler + + +class StacResults(NexusResults): + def __init__(self, contents): + NexusResults.__init__(self) + self.contents = contents + + def toJson(self): + return json.dumps(self.contents, indent=4) + + +@nexus_handler +class StacCatalog(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = 'STAC Catalog Handler' + path = '^/cdmscatalog/?.*$' + description = '' + params = {} + singleton = True + + def __init__(self, tile_service_factory, config=None): + BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory) + self.config = config + + def construct_catalog(self, execution_id: str): + return { + 'stac_version': '1.0.0', + 'type': 'Catalog', + 'id': str(execution_id), + 'description': 'STAC Catalog for CDMS output', + 'links': [ + { + 'rel': 'collection', + 'href': f'https://{self.host}/cdmscatalog/{execution_id}/{output_format}', + 'title': f'Collection of pages for {execution_id} {output_format} output' + } + for output_format in ['CSV', 'JSON', 'NETCDF'] + ] + } + + def construct_collection(self, execution_id: str, output_format: str, + num_primary_matched: int, page_size: int, start_time: str, + end_time: str, bbox: List[float]): + links = [ + { + 'rel': 'self', + 'href': f'https://{self.host}/cdmscatalog/{execution_id}/{output_format}', + 'title': 'The current page', + 'type': 'application/json' + }, + { + 'rel': 'root', + 'href': f'https://{self.host}/cdmscatalog/{execution_id}', + 'title': f'Root catalog for {execution_id}', + } + ] + + url = f'https://{self.host}/cdmsresults?id={execution_id}&output={output_format}' + for page_num in range(1, num_primary_matched, page_size): + links.append({ + 'rel': 'data', + 'href': f'{url}&pageNum={page_num}&pageSize={page_size}' + }) + + return { + 'stac_version': '1.0.0', + 'type': 'Collection', + 'license': 'not-provided', + 'id': f'{execution_id}.{output_format}', + 'description': 'Collection of results for CDMS execution and result format', + 'extent': { + 'spatial': { + 'bbox': bbox + }, + 'temporal': { + 'interval': [start_time, end_time] + } + }, + 'links': links, + } + + def calc(self, request, **args): + page_size = request.get_int_arg('pageSize', default=1000) + url_path_regex = '^\/cdmscatalog\/?(?P<id>[a-zA-Z0-9-]*)\/?(?P<format>[a-zA-Z0-9]*)' + match = re.search(url_path_regex, request.requestHandler.request.path) + + execution_id = match.group('id') + output_format = match.group('format') + + self.host = request.requestHandler.request.host + + if not execution_id: + raise NexusProcessingException( + reason=f'Execution ID path param must be provided.', + code=400 + ) + + if execution_id: + try: + execution_id = uuid.UUID(execution_id) + except ValueError: + raise NexusProcessingException( + reason=f'"{execution_id}" is not a valid uuid', + code=400 + ) + + if output_format and output_format.upper() not in ['CSV', 'JSON', 'NETCDF']: + raise NexusProcessingException( + reason=f'"{output_format}" is not a valid format. Should be CSV, JSON, or NETCDF.', + code=400 + ) + + if execution_id and not output_format: + # Route to STAC catalog for execution + stac_output = self.construct_catalog(execution_id) + elif execution_id and output_format: + # Route to STAC collection for execution+format + + with ResultsRetrieval(self.config) as retrieval: + try: + execution_stats = retrieval.retrieveStats(execution_id) + execution_params = retrieval.retrieveParams(execution_id) + except NexusProcessingException: + execution_stats = {} + + num_primary_matched = execution_stats.get('numPrimaryMatched', 0) + start_time = execution_params['startTime'].isoformat() + end_time = execution_params['endTime'].isoformat() + bbox = list(map(float, execution_params['bbox'].split(','))) + + stac_output = self.construct_collection( + execution_id, output_format, num_primary_matched, page_size, + start_time, end_time, bbox + ) + else: + raise NexusProcessingException( + reason=f'Invalid path parameters were provided', + code=400 + ) + + return StacResults(stac_output) diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py index bc568f8..7e5715f 100644 --- a/analysis/webservice/algorithms/doms/__init__.py +++ b/analysis/webservice/algorithms/doms/__init__.py @@ -20,6 +20,7 @@ from . import ExecutionCancel from . import DatasetListQuery from . import DomsInitialization from . import MatchupQuery +from . import StacCatalog from . import MetadataQuery from . import ResultsPlotQuery from . import ResultsRetrieval diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py index d5c1204..be9d332 100644 --- a/analysis/webservice/webmodel/NexusExecutionResults.py +++ b/analysis/webservice/webmodel/NexusExecutionResults.py @@ -60,6 +60,12 @@ def construct_done(status, created, completed, execution_id, params, host): ('JSON', 'application/json'), ('NETCDF', 'binary/octet-stream') ] + job_body['links'].append({ + 'href': f'{host}/cdmscatalog/{execution_id}', + 'title': 'STAC Catalog for execution results', + 'type': 'application/json', + 'rel': 'stac' + }) data_links = [{ 'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}', 'title': 'Download results',
