This is an automated email from the ASF dual-hosted git repository. skperez pushed a commit to branch SDAP-455 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 8725ff6fc35c24436323b89edc5cf9b91901e886 Author: skorper <[email protected]> AuthorDate: Tue May 9 17:46:54 2023 -0700 Large job tracking --- CHANGELOG.md | 1 + .../webservice/algorithms/doms/ExecutionCancel.py | 83 ++++++++++ .../webservice/algorithms/doms/ExecutionStatus.py | 69 ++++++++ .../webservice/algorithms/doms/ResultsStorage.py | 61 +++++-- analysis/webservice/algorithms/doms/__init__.py | 2 + analysis/webservice/algorithms_spark/Matchup.py | 176 ++++++++++++++------- .../NexusCalcSparkTornadoHandler.py} | 25 +-- analysis/webservice/apidocs/openapi.yml | 36 +++++ .../app_builders/HandlerArgsBuilder.py | 1 + .../request/handlers/NexusRequestHandler.py | 22 ++- .../webservice/webmodel/NexusExecutionResults.py | 149 +++++++++++++++++ docker/nexus-webapp/Dockerfile | 4 +- 12 files changed, 535 insertions(+), 94 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f509a7..5e524f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Deletebyquery: Parameter to set the number of rows to fetch from Solr. Speeds up time to gather tiles to delete; especially when there is a lot of them. - Added Saildrone's `baja_2018` insitu dataset. +- SDAP-455: Large job tracking ### Changed - SDAP-443: - Replacing DOMS terminology with CDMS terminology: diff --git a/analysis/webservice/algorithms/doms/ExecutionCancel.py b/analysis/webservice/algorithms/doms/ExecutionCancel.py new file mode 100644 index 0000000..c9ef347 --- /dev/null +++ b/analysis/webservice/algorithms/doms/ExecutionCancel.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +from webservice.NexusHandler import nexus_handler +from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval +from webservice.webmodel import NexusExecutionResults +from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler +from datetime import datetime +from webservice.algorithms.doms.ResultsStorage import ResultsStorage +from webservice.webmodel.NexusExecutionResults import ExecutionStatus +from webservice.webmodel import NexusProcessingException + + +@nexus_handler +class ExecutionStatusHandler(NexusCalcSparkTornadoHandler): + name = 'Execution Status Handler' + path = '/job/cancel' + description = '' + params = {} + singleton = True + + def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None): + NexusCalcSparkTornadoHandler.__init__( + self, + algorithm_config=algorithm_config, + sc=sc, + tile_service_factory=tile_service_factory + ) + self.tile_service_factory = tile_service_factory + self.config = config + + def calc(self, request, tornado_io_loop, **args): + execution_id = request.get_argument('id', None) + + try: + execution_id = uuid.UUID(execution_id) + except ValueError: + raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400) + + with ResultsRetrieval(self.config) as retrieval: + try: + execution_details = retrieval.retrieveExecution(execution_id) + except ValueError: + raise NexusProcessingException( + reason=f'Execution {execution_id} not found', + code=404 + ) + + job_status = NexusExecutionResults.ExecutionStatus(execution_details['status']) + + # Only proceed if status is "running". Otherwise, noop + if job_status == ExecutionStatus.RUNNING: + # Cancel Spark job + self._sc.cancelJobGroup(str(execution_id)) + + # Update job status to "cancelled" + end = datetime.utcnow() + with ResultsStorage(self.config) as storage: + storage.updateExecution( + execution_id, + completeTime=end, + status=ExecutionStatus.CANCELLED.value, + message=None, + stats=None, + results=None + ) + + # Redirect to job status endpoint + request.requestHandler.redirect(f'/job?id={execution_id}') diff --git a/analysis/webservice/algorithms/doms/ExecutionStatus.py b/analysis/webservice/algorithms/doms/ExecutionStatus.py new file mode 100644 index 0000000..90dced8 --- /dev/null +++ b/analysis/webservice/algorithms/doms/ExecutionStatus.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the 'License'); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +from . import BaseDomsHandler +from webservice.NexusHandler import nexus_handler +from webservice.algorithms.doms.ResultsStorage import ResultsRetrieval +from webservice.webmodel import NexusExecutionResults +from webservice.webmodel import NexusProcessingException + + +@nexus_handler +class ExecutionStatusHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = 'Execution Status Handler' + path = '/job' + description = '' + params = {} + singleton = True + + def __init__(self, tile_service_factory, config=None): + BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self, tile_service_factory) + self.config = config + + def calc(self, request, **args): + execution_id = request.get_argument('id', None) + + try: + execution_id = uuid.UUID(execution_id) + except ValueError: + raise NexusProcessingException(reason='"id" argument must be a valid uuid', code=400) + + # Check if the job is done + with ResultsRetrieval(self.config) as retrieval: + try: + execution_details = retrieval.retrieveExecution(execution_id) + execution_params = retrieval.retrieveParams(execution_id) + except ValueError: + raise NexusProcessingException( + reason=f'Execution {execution_id} not found', + code=404 + ) + + # TODO check if job ID is valid. raise error w/meanginful error if not + + job_status = NexusExecutionResults.ExecutionStatus(execution_details['status']) + host = f'{request.requestHandler.request.protocol}://{request.requestHandler.request.host}' + + return NexusExecutionResults.NexusExecutionResults( + status=job_status, + created=execution_details['timeStarted'], + completed=execution_details['timeCompleted'], + execution_id=execution_id, + message=execution_details['message'], + params=execution_params, + host=host + ) diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py index c989286..7ca8374 100644 --- a/analysis/webservice/algorithms/doms/ResultsStorage.py +++ b/analysis/webservice/algorithms/doms/ResultsStorage.py @@ -98,24 +98,39 @@ class ResultsStorage(AbstractResultsContainer): def __init__(self, config=None): AbstractResultsContainer.__init__(self, config) - def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None): - self._log.info('Beginning results write') + def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None): + """ + Initial insert into database for CDMS matchup request. This + populates the execution and params table. + """ if isinstance(execution_id, str): execution_id = uuid.UUID(execution_id) - execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail) + execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status) self.__insertParams(execution_id, params) - self.__insertStats(execution_id, stats) - self.__insertResults(execution_id, results) - self._log.info('Results write finished') return execution_id - def insertExecution(self, execution_id, startTime, completeTime, userEmail): + def updateExecution(self, execution_id, completeTime, status, message, stats, results): + self.__updateExecution(execution_id, completeTime, status, message) + if stats: + self.__insertStats(execution_id, stats) + if results: + self.__insertResults(execution_id, results) + + def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status): + """ + Insert new entry into execution table + """ if execution_id is None: execution_id = uuid.uuid4() - cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)" - self._session.execute(cql, (execution_id, startTime, completeTime, userEmail)) + cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)" + self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status)) + return execution_id + + def __updateExecution(self, execution_id, complete_time, status, message=None): + cql = "UPDATE doms_executions SET time_completed = %s, status = %s, message = %s WHERE id=%s" + self._session.execute(cql, (complete_time, status, message, execution_id)) return execution_id def __insertParams(self, execution_id, params): @@ -248,7 +263,7 @@ class ResultsRetrieval(AbstractResultsContainer): if isinstance(execution_id, str): execution_id = uuid.UUID(execution_id) - params = self.__retrieveParams(execution_id) + params = self.retrieveParams(execution_id) stats = self.__retrieveStats(execution_id) data = self.__retrieveData(execution_id, trim_data=trim_data) return params, stats, data @@ -302,7 +317,9 @@ class ResultsRetrieval(AbstractResultsContainer): "depth": row.depth } for key in row.measurement_values: - value = float(row.measurement_values[key]) + value = (row.measurement_values[key]) + if value is not None: + value = float(value) entry[key] = value return entry @@ -321,7 +338,7 @@ class ResultsRetrieval(AbstractResultsContainer): raise Exception("Execution not found with id '%s'" % id) - def __retrieveParams(self, id): + def retrieveParams(self, id): cql = "SELECT * FROM doms_params where execution_id = %s limit 1" rows = self._session.execute(cql, (id,)) for row in rows: @@ -341,3 +358,23 @@ class ResultsRetrieval(AbstractResultsContainer): return params raise Exception("Execution not found with id '%s'" % id) + + def retrieveExecution(self, execution_id): + """ + Retrieve execution details from database. + + :param execution_id: Execution ID + :return: execution status dictionary + """ + + cql = "SELECT * FROM doms_executions where id = %s limit 1" + rows = self._session.execute(cql, (execution_id,)) + for row in rows: + return { + 'status': row.status, + 'message': row.message, + 'timeCompleted': row.time_completed, + 'timeStarted': row.time_started + } + + raise ValueError('Execution not found with id %s', execution_id) diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py index 9580d38..bc568f8 100644 --- a/analysis/webservice/algorithms/doms/__init__.py +++ b/analysis/webservice/algorithms/doms/__init__.py @@ -15,6 +15,8 @@ from . import BaseDomsHandler +from . import ExecutionStatus +from . import ExecutionCancel from . import DatasetListQuery from . import DomsInitialization from . import MatchupQuery diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py index 1274b64..fd998e6 100644 --- a/analysis/webservice/algorithms_spark/Matchup.py +++ b/analysis/webservice/algorithms_spark/Matchup.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +import uuid from typing import Optional import logging import threading @@ -30,16 +29,17 @@ from pytz import timezone, UTC from scipy import spatial from shapely import wkt from shapely.geometry import box +import functools from webservice.NexusHandler import nexus_handler -from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler +from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler from webservice.algorithms.doms import config as edge_endpoints from webservice.algorithms.doms import values as doms_values -from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults from webservice.algorithms.doms.ResultsStorage import ResultsStorage from webservice.algorithms.doms.insitu import query_insitu as query_edge from webservice.algorithms.doms.insitu import query_insitu_schema from webservice.webmodel import NexusProcessingException +from webservice.webmodel.NexusExecutionResults import ExecutionStatus EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' @@ -66,7 +66,7 @@ def iso_time_to_epoch(str_time): @nexus_handler -class Matchup(NexusCalcSparkHandler): +class Matchup(NexusCalcSparkTornadoHandler): name = "Matchup" path = "/match_spark" description = "Match measurements between two or more datasets" @@ -147,7 +147,12 @@ class Matchup(NexusCalcSparkHandler): singleton = True def __init__(self, algorithm_config=None, sc=None, tile_service_factory=None, config=None): - NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, tile_service_factory=tile_service_factory) + NexusCalcSparkTornadoHandler.__init__( + self, + algorithm_config=algorithm_config, + sc=sc, + tile_service_factory=tile_service_factory + ) self.log = logging.getLogger(__name__) self.tile_service_factory = tile_service_factory self.config = config @@ -219,7 +224,66 @@ class Matchup(NexusCalcSparkHandler): depth_min, depth_max, time_tolerance, radius_tolerance, \ platforms, match_once, result_size_limit - def calc(self, request, **args): + def async_calc(self, execution_id, tile_ids, bounding_polygon, primary_ds_name, + secondary_ds_names, parameter_s, start_time, end_time, depth_min, + depth_max, time_tolerance, radius_tolerance, platforms, match_once, + result_size_limit, start): + # Call spark_matchup + self.log.debug("Calling Spark Driver") + + try: + self._sc.setJobGroup(execution_id, execution_id) + spark_result = spark_matchup_driver( + tile_ids, wkt.dumps(bounding_polygon), + primary_ds_name, + secondary_ds_names, + parameter_s, + depth_min, + depth_max, time_tolerance, + radius_tolerance, + platforms, + match_once, + self.tile_service_factory, + sc=self._sc + ) + except Exception as error: + self.log.exception(error) + end = datetime.utcnow() + with ResultsStorage(self.config) as storage: + storage.updateExecution( + uuid.UUID(execution_id), + completeTime=end, + status=ExecutionStatus.FAILED.value, + message=error, + stats=None, + results=None + ) + return + + self.log.debug("Building and saving results") + end = datetime.utcnow() + + total_keys = len(list(spark_result.keys())) + total_values = sum(len(v) for v in spark_result.values()) + details = { + "timeToComplete": int((end - start).total_seconds()), + "numSecondaryMatched": total_values, + "numPrimaryMatched": total_keys + } + + matches = Matchup.convert_to_matches(spark_result) + + with ResultsStorage(self.config) as storage: + storage.updateExecution( + uuid.UUID(execution_id), + completeTime=end, + status=ExecutionStatus.SUCCESS.value, + message=None, + stats=details, + results=matches + ) + + def calc(self, request, tornado_io_loop, **args): start = datetime.utcnow() # TODO Assuming Satellite primary bounding_polygon, primary_ds_name, secondary_ds_names, parameter_s, \ @@ -227,33 +291,6 @@ class Matchup(NexusCalcSparkHandler): depth_min, depth_max, time_tolerance, radius_tolerance, \ platforms, match_once, result_size_limit = self.parse_arguments(request) - with ResultsStorage(self.config) as resultsStorage: - - execution_id = str(resultsStorage.insertExecution(None, start, None, None)) - - self.log.debug("Querying for tiles in search domain") - # Get tile ids in box - tile_ids = [tile.tile_id for tile in - self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name, - start_seconds_from_epoch, end_seconds_from_epoch, - fetch_data=False, fl='id', - sort=['tile_min_time_dt asc', 'tile_min_lon asc', - 'tile_min_lat asc'], rows=5000)] - - self.log.info('Found %s tile_ids', len(tile_ids)) - # Call spark_matchup - self.log.debug("Calling Spark Driver") - try: - spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name, - secondary_ds_names, parameter_s, depth_min, depth_max, time_tolerance, - radius_tolerance, platforms, match_once, self.tile_service_factory, sc=self._sc) - except Exception as e: - self.log.exception(e) - raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500) - - end = datetime.utcnow() - - self.log.debug("Building and saving results") args = { "primary": primary_ds_name, "matchup": secondary_ds_names, @@ -272,35 +309,60 @@ class Matchup(NexusCalcSparkHandler): if depth_max is not None: args["depthMax"] = float(depth_max) - total_keys = len(list(spark_result.keys())) - total_values = sum(len(v) for v in spark_result.values()) - details = { - "timeToComplete": int((end - start).total_seconds()), - "numSecondaryMatched": total_values, - "numPrimaryMatched": total_keys - } - matches = Matchup.convert_to_matches(spark_result) + with ResultsStorage(self.config) as resultsStorage: + execution_id = str(resultsStorage.insertInitialExecution( + params=args, + startTime=start, + status=ExecutionStatus.RUNNING.value + )) - def do_result_insert(): - with ResultsStorage(self.config) as storage: - storage.insertResults(results=matches, params=args, stats=details, - startTime=start, completeTime=end, userEmail="", - execution_id=execution_id) + self.log.debug("Querying for tiles in search domain") + # Get tile ids in box + tile_ids = [tile.tile_id for tile in + self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name, + start_seconds_from_epoch, end_seconds_from_epoch, + fetch_data=False, fl='id', + sort=['tile_min_time_dt asc', 'tile_min_lon asc', + 'tile_min_lat asc'], rows=5000)] - threading.Thread(target=do_result_insert).start() + self.log.info('Found %s tile_ids', len(tile_ids)) - # Get only the first "result_size_limit" results - # '0' means returns everything - if result_size_limit > 0: - matches = matches[0:result_size_limit] + if not tile_ids: + # There are no matching tiles + end = datetime.utcnow() + with ResultsStorage(self.config) as storage: + storage.updateExecution( + uuid.UUID(execution_id), + completeTime=end, + status=ExecutionStatus.FAILED.value, + message='No tiles matched the provided domain' + ) - result = DomsQueryResults(results=matches, args=args, - details=details, bounds=None, - count=len(matches), computeOptions=None, - executionId=execution_id) + # Start async processing with Spark. Do not wait for response + # before returning to user. + tornado_io_loop.run_in_executor(request.requestHandler.executor, functools.partial( + self.async_calc, + execution_id=execution_id, + tile_ids=tile_ids, + bounding_polygon=bounding_polygon, + primary_ds_name=primary_ds_name, + secondary_ds_names=secondary_ds_names, + parameter_s=parameter_s, + start_time=start_time, + end_time=end_time, + depth_min=depth_min, + depth_max=depth_max, + time_tolerance=time_tolerance, + radius_tolerance=radius_tolerance, + platforms=platforms, + match_once=match_once, + result_size_limit=result_size_limit, + start=start + )) + + request.requestHandler.redirect(f'/job?id={execution_id}') - return result @classmethod def convert_to_matches(cls, spark_result): diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py similarity index 61% copy from analysis/webservice/algorithms/doms/__init__.py copy to analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py index 9580d38..c2a38c5 100644 --- a/analysis/webservice/algorithms/doms/__init__.py +++ b/analysis/webservice/algorithms_spark/NexusCalcSparkTornadoHandler.py @@ -13,22 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler -from . import BaseDomsHandler -from . import DatasetListQuery -from . import DomsInitialization -from . import MatchupQuery -from . import MetadataQuery -from . import ResultsPlotQuery -from . import ResultsRetrieval -from . import ResultsStorage -from . import StatsQuery -from . import ValuesQuery -from . import config -from . import datafetch -from . import fetchedgeimpl -from . import geo -from . import insitusubset -from . import subsetter -from . import values -from . import workerthread +logger = logging.getLogger(__name__) + + +class NexusCalcSparkTornadoHandler(NexusCalcSparkHandler): + pass diff --git a/analysis/webservice/apidocs/openapi.yml b/analysis/webservice/apidocs/openapi.yml index 55802cd..3f3490b 100644 --- a/analysis/webservice/apidocs/openapi.yml +++ b/analysis/webservice/apidocs/openapi.yml @@ -646,6 +646,42 @@ paths: application/json: schema: $ref: '#/components/schemas/Error' + /job: + get: + summary: | + Get job status + operationId: job + tags: + - Analytics + description: "Get job status by execution id" + parameters: + - in: query + name: id + description: | + The job execution ID + required: true + schema: + type: string + format: uuid + example: c864a51b-3d87-4872-9070-632820b1cae2 + /job/cancel: + get: + summary: | + Cancel running job + operationId: jobCancel + tags: + - Analytics + description: "Cancel running job" + parameters: + - in: query + name: id + description: | + The job execution ID + required: true + schema: + type: string + format: uuid + example: c864a51b-3d87-4872-9070-632820b1cae2 externalDocs: description: Documentation url: https://incubator-sdap-nexus.readthedocs.io/en/latest/index.html diff --git a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py index fed08b9..2a84ae7 100644 --- a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py +++ b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py @@ -37,6 +37,7 @@ class HandlerArgsBuilder: class_wrapper == webservice.algorithms_spark.Matchup.Matchup or class_wrapper == webservice.algorithms_spark.MatchupDoms.MatchupDoms or issubclass(class_wrapper, webservice.algorithms.doms.BaseDomsHandler.BaseDomsQueryCalcHandler) + or issubclass(class_wrapper, webservice.algorithms_spark.NexusCalcSparkTornadoHandler.NexusCalcSparkTornadoHandler) or class_wrapper == webservice.algorithms.doms.ResultsRetrieval.DomsResultsRetrievalHandler or class_wrapper == webservice.algorithms.doms.ResultsPlotQuery.DomsResultsPlotHandler ) diff --git a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py index 2645574..95bddf4 100644 --- a/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py +++ b/analysis/webservice/nexus_tornado/request/handlers/NexusRequestHandler.py @@ -20,6 +20,7 @@ import tornado.ioloop from webservice.nexus_tornado.request.renderers import NexusRendererFactory from webservice.webmodel import NexusRequestObjectTornadoFree, NexusRequestObject, NexusProcessingException +from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler class NexusRequestHandler(tornado.web.RequestHandler): @@ -44,18 +45,29 @@ class NexusRequestHandler(tornado.web.RequestHandler): # create NexusCalcHandler which will process the request instance = self.__clazz(**self._clazz_init_args) + io_loop = tornado.ioloop.IOLoop.current() + try: - # process the request asynchronously on a different thread, - # the current tornado handler is still available to get other user requests - results = yield tornado.ioloop.IOLoop.current().run_in_executor(self.executor, instance.calc, request) + if isinstance(instance, NexusCalcSparkTornadoHandler): + results = instance.calc(request, io_loop) + else: + results = yield io_loop.run_in_executor( + self.executor, + instance.calc, + request + ) try: self.set_status(results.status_code) except AttributeError: pass - renderer = NexusRendererFactory.get_renderer(request) - renderer.render(self, results) + # Only render results if there are results to render. + # "NexusCalcSparkTornadoHandler" endpoints redirectm so no + # need to render. + if not isinstance(instance, NexusCalcSparkTornadoHandler): + renderer = NexusRendererFactory.get_renderer(request) + renderer.render(self, results) except NexusProcessingException as e: self.async_onerror_callback(e.reason, e.code) diff --git a/analysis/webservice/webmodel/NexusExecutionResults.py b/analysis/webservice/webmodel/NexusExecutionResults.py new file mode 100644 index 0000000..736ba76 --- /dev/null +++ b/analysis/webservice/webmodel/NexusExecutionResults.py @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from enum import Enum + + +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +class ExecutionStatus(Enum): + RUNNING = 'running' + SUCCESS = 'success' + FAILED = 'failed' + CANCELLED = 'cancelled' + + +def construct_job_status(job_state, created, updated, execution_id, params, host, message=''): + return { + 'status': job_state.value, + 'message': message, + 'createdAt': created, + 'updatedAt': updated, + 'links': [{ + 'href': f'{host}/job?id={execution_id}', + 'title': 'The current page', + 'type': 'application/json', + 'rel': 'self' + }], + 'params': params, + 'jobID': execution_id + } + + +def construct_done(status, created, completed, execution_id, params, host): + job_body = construct_job_status( + status, + created, + completed, + execution_id, + params, + host + ) + + # Construct urls + formats = [ + 'CSV', + 'JSON', + 'NETCDF' + ] + data_links = [{ + 'href': f'{host}/cdmsresults?id={execution_id}&output={output_format}', + 'title': 'Download results', + 'rel': 'data' + } for output_format in formats] + job_body['links'].extend(data_links) + return job_body + + +def construct_running(status, created, execution_id, params, host): + job_body = construct_job_status( + status, + created, + None, + execution_id, + params, + host + ) + job_body['links'].append({ + 'href': f'{host}/job/cancel?id={execution_id}', + 'title': 'Cancel the job', + 'rel': 'cancel' + }) + return job_body + + +def construct_error(status, created, completed, execution_id, message, params, host): + return construct_job_status( + status, + created, + completed, + execution_id, + params, + host, + message + ) + + +def construct_cancelled(status, created, completed, execution_id, params, host): + return construct_job_status( + status, + created, + completed, + execution_id, + params, + host + ) + + +class NexusExecutionResults: + def __init__(self, status=None, created=None, completed=None, execution_id=None, message='', + params=None, host=None, status_code=200): + self.status_code = status_code + self.status = status + self.created = created + self.completed = completed + self.execution_id = execution_id + self.message = message + self.execution_params = params + self.host = host + + def toJson(self): + params = { + 'status': self.status, + 'created': self.created, + 'execution_id': self.execution_id, + 'params': self.execution_params, + 'host': self.host + } + if self.status == ExecutionStatus.SUCCESS: + params['completed'] = self.completed + construct = construct_done + elif self.status == ExecutionStatus.RUNNING: + construct = construct_running + elif self.status == ExecutionStatus.FAILED: + params['completed'] = self.completed + params['message'] = self.message + construct = construct_error + elif self.status == ExecutionStatus.CANCELLED: + params['completed'] = self.completed + construct = construct_cancelled + else: + # Raise error -- job state is invalid + raise ValueError('Unable to fetch status for execution {}', self.execution_id) + + job_details = construct(**params) + return json.dumps(job_details, indent=4, default=str) diff --git a/docker/nexus-webapp/Dockerfile b/docker/nexus-webapp/Dockerfile index 1caf462..0f53ce1 100644 --- a/docker/nexus-webapp/Dockerfile +++ b/docker/nexus-webapp/Dockerfile @@ -95,10 +95,10 @@ RUN python3 setup.py install clean && mamba clean -afy WORKDIR /incubator-sdap-nexus/tools/deletebyquery -RUN pip3 install cassandra-driver==3.20.1 --install-option="--no-cython" +RUN pip3 install cython +RUN pip3 install cassandra-driver==3.20.1 RUN pip3 install pyspark py4j RUN pip3 install -r requirements.txt -RUN pip3 install cython RUN rm requirements.txt WORKDIR /incubator-sdap-nexus
