This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch add_metadata_to_remote_coll in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 343ebcc269ecc3b610138085a331f9d56567ec5f Author: Thomas Loubrieu <[email protected]> AuthorDate: Thu Jul 7 12:16:54 2022 -0700 makes --collection-path optional, add remote collection cache for list metadata --- analysis/tests/redirect/test_RemoteSDAPCache.py | 46 ++++++++++++++++++++++ analysis/webservice/algorithms/DataSeriesList.py | 37 ++++++++++++----- .../app_builders/HandlerArgsBuilder.py | 12 ++++-- .../nexus_tornado/app_builders/NexusAppBuilder.py | 4 +- analysis/webservice/redirect/RedirectHandler.py | 1 + analysis/webservice/redirect/RemoteSDAPCache.py | 42 ++++++++++++++++++++ analysis/webservice/redirect/__init__.py | 4 +- analysis/webservice/webapp.py | 27 +++++++------ 8 files changed, 144 insertions(+), 29 deletions(-) diff --git a/analysis/tests/redirect/test_RemoteSDAPCache.py b/analysis/tests/redirect/test_RemoteSDAPCache.py new file mode 100644 index 0000000..237659f --- /dev/null +++ b/analysis/tests/redirect/test_RemoteSDAPCache.py @@ -0,0 +1,46 @@ +import unittest +from unittest import mock +import requests + +from webservice.redirect import RemoteSDAPCache + + +class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return self.json_data + +def mocked_requests_get(*asgs, **kwargs): + json_data = [ + { + "shortName": "PM25", + "title": "PM25", + "tileCount": 21515, + "start": 1514818800.0, + "end": 1640991600.0, + "iso_start": "2018-01-01T15:00:00+0000", + "iso_end": "2021-12-31T23:00:00+0000" + } + ] + return MockResponse(json_data, 200) + + +def mocked_requests_get_timeout(*asgs, **kwargs): + raise requests.exceptions.ConnectTimeout() + + +class MyTestCase(unittest.TestCase): + + @mock.patch('requests.get', side_effect=mocked_requests_get) + def test_get(self, mock_get): + remote_sdap_cache = RemoteSDAPCache() + + collection = remote_sdap_cache.get('https://aq-sdap.stcenter.net/nexus/', 'PM25') + self.assertEqual(collection["start"], 1514818800.0) + + +if __name__ == '__main__': + unittest.main() diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py index e79df4e..09c5a2a 100644 --- a/analysis/webservice/algorithms/DataSeriesList.py +++ b/analysis/webservice/algorithms/DataSeriesList.py @@ -17,6 +17,8 @@ import json from webservice.algorithms.NexusCalcHandler import NexusCalcHandler +from webservice.redirect import RemoteSDAPCache +from webservice.redirect import CollectionNotFound from webservice.NexusHandler import nexus_handler from webservice.webmodel import cached @@ -31,8 +33,9 @@ class DataSeriesListCalcHandlerImpl(NexusCalcHandler): path = "/list" description = "Lists datasets currently available for analysis" params = {} + remote_sdaps = RemoteSDAPCache() - def __init__(self, tile_service_factory, remote_collections, **kwargs): + def __init__(self, tile_service_factory, remote_collections=None, **kwargs): super().__init__(tile_service_factory, **kwargs) self._remote_collections = remote_collections @@ -46,16 +49,30 @@ class DataSeriesListCalcHandlerImpl(NexusCalcHandler): def toJson(self): return json.dumps(self.result) - collection_list = self._get_tile_service().get_dataseries_list() + #collection_list = self._get_tile_service().get_dataseries_list() + collection_list = [] # add remote collections - for collection in self._remote_collections.values(): - collection_list.append( - { - "shortName": collection["id"], - "remoteUrl": collection["path"], - "remoteShortName": collection["remote_id"] if 'remote_id' in collection else collection["id"] - } - ) + if self._remote_collections: + for collection in self._remote_collections.values(): + + current_collection = { + "shortName": collection["id"], + "remoteUrl": collection["path"], + "remoteShortName": collection["remote_id"] if 'remote_id' in collection else collection["id"] + } + + try: + remote_collection = self.remote_sdaps.get( + collection["path"], + current_collection["remoteShortName"] + ) + del remote_collection['shortName'] + current_collection.update(remote_collection) + + except CollectionNotFound as e: + logger.warning(e) + finally: + collection_list.append(current_collection) return SimpleResult(collection_list) diff --git a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py index ffcca13..e3edbaf 100644 --- a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py +++ b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py @@ -4,7 +4,13 @@ from .SparkContextBuilder import SparkContextBuilder class HandlerArgsBuilder: - def __init__(self, max_request_threads, tile_service_factory, algorithm_config, remote_collections): + def __init__( + self, + max_request_threads, + tile_service_factory, + algorithm_config, + remote_collections=None + ): self.request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads) self.tile_service_factory = tile_service_factory self.algorithm_config = algorithm_config @@ -22,7 +28,7 @@ class HandlerArgsBuilder: @staticmethod def handler_needs_remote_collections(class_wrapper): - return class_wrapper == webservice.algorithms.DataSeriesList.D + return class_wrapper == webservice.algorithms.DataSeriesList.DataSeriesListCalcHandlerImpl def get_args(self, clazz_wrapper): args = dict( @@ -37,7 +43,7 @@ class HandlerArgsBuilder: if self.handler_needs_algorithm_config(clazz_wrapper): args['config'] = self.algorithm_config - if clazz_wrapper == webservice.algorithms.DataSeriesList.DataSeriesListCalcHandlerImpl: + if self.handler_needs_remote_collections(clazz_wrapper): args['remote_collections'] = self.remote_collections return args diff --git a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py index 20eb335..80f1c92 100644 --- a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py +++ b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py @@ -30,7 +30,7 @@ class NexusAppBuilder: r'/apidocs/(.*)', tornado.web.StaticFileHandler, {'path': str(apidocs_path), "default_filename": "index.html"})) - def set_modules(self, module_dir, algorithm_config, remote_collections, max_request_threads=4): + def set_modules(self, module_dir, algorithm_config, remote_collections=None, max_request_threads=4): for moduleDir in module_dir: self.log.info("Loading modules from %s" % moduleDir) importlib.import_module(moduleDir) @@ -44,7 +44,7 @@ class NexusAppBuilder: max_request_threads, tile_service_factory, algorithm_config, - remote_collections + remote_collections=remote_collections ) for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS: diff --git a/analysis/webservice/redirect/RedirectHandler.py b/analysis/webservice/redirect/RedirectHandler.py index 7ffda43..58b5aff 100644 --- a/analysis/webservice/redirect/RedirectHandler.py +++ b/analysis/webservice/redirect/RedirectHandler.py @@ -4,6 +4,7 @@ from webservice.webmodel.RequestParameters import RequestParameters logger = logging.getLogger(__name__) + class RedirectHandler(tornado.web.RequestHandler): def initialize(self, redirected_collections=None): diff --git a/analysis/webservice/redirect/RemoteSDAPCache.py b/analysis/webservice/redirect/RemoteSDAPCache.py new file mode 100644 index 0000000..17adb6f --- /dev/null +++ b/analysis/webservice/redirect/RemoteSDAPCache.py @@ -0,0 +1,42 @@ +import requests +from datetime import datetime +from datetime import timedelta +from dataclasses import dataclass + + +@dataclass +class RemoteSDAPList: + list: dict + outdated_at: datetime + + +class CollectionNotFound(Exception): + pass + + +class RemoteSDAPCache: + def __init__(self): + self.sdap_lists = {} + + def _add(self, url, timeout=2, max_age=3600*24): + list_url = f"{url}/list" + r = requests.get(list_url, timeout=timeout) + if r.status_code == 200: + self.sdap_lists[url] = RemoteSDAPList( + list=r.json(), + outdated_at=datetime.now()+timedelta(seconds=max_age) + ) + else: + raise CollectionNotFound("url %s was not reachable, responded with status %s", list_url, r.status_code) + + def get(self, url, short_name): + stripped_url = url.strip('/') + if stripped_url not in self.sdap_lists or self.sdap_lists[stripped_url].outdated_at>datetime.now(): + self._add(stripped_url) + + for collection in self.sdap_lists[stripped_url].list: + if collection['shortName'] == short_name: + return collection + + raise CollectionNotFound("collection %s has not been found in url %s", short_name, stripped_url) + diff --git a/analysis/webservice/redirect/__init__.py b/analysis/webservice/redirect/__init__.py index 201e7a2..be5fe45 100644 --- a/analysis/webservice/redirect/__init__.py +++ b/analysis/webservice/redirect/__init__.py @@ -1,2 +1,4 @@ from .RedirectHandler import RedirectHandler -from .RemoteCollectionMatcher import RemoteCollectionMatcher \ No newline at end of file +from .RemoteCollectionMatcher import RemoteCollectionMatcher +from .RemoteSDAPCache import RemoteSDAPCache +from .RemoteSDAPCache import CollectionNotFound \ No newline at end of file diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py index 8db6459..e060d3c 100644 --- a/analysis/webservice/webapp.py +++ b/analysis/webservice/webapp.py @@ -74,18 +74,27 @@ def main(): define('cassandra_host', help='cassandra host') define('cassandra_username', help='cassandra username') define('cassandra_password', help='cassandra password') - define('collections_path', help='collection config path') + define('collections_path', default=None, help='collection config path') parse_command_line() algorithm_config = inject_args_in_config(options, algorithm_config) - remote_collection_matcher = RemoteCollectionMatcher(options.collections_path) + remote_collections = None + router_rules = [] + if options.collections_path: + # build retirect app + remote_collection_matcher = RemoteCollectionMatcher(options.collections_path) + remote_collections = remote_collection_matcher.get_remote_collections() + remote_sdap_app = RedirectAppBuilder(remote_collection_matcher).build( + host=options.address, + debug=options.debug) + router_rules.append(Rule(remote_collection_matcher, remote_sdap_app)) # build nexus app nexus_app_builder = NexusAppBuilder().set_modules( web_config.get("modules", "module_dirs").split(","), algorithm_config, - remote_collection_matcher.get_remote_collections() + remote_collections=remote_collections ) if web_config.get("static", "static_enabled") == "true": @@ -96,17 +105,9 @@ def main(): log.info("Static resources disabled") local_sdap_app = nexus_app_builder.build(host=options.address, debug=options.debug) + router_rules.append(Rule(AnyMatches(), local_sdap_app)) - # build redirect app - remote_sdap_app = RedirectAppBuilder(remote_collection_matcher).build( - host=options.address, - debug=options.debug) - - router = RuleRouter([ - Rule(remote_collection_matcher, remote_sdap_app), - Rule(AnyMatches(), local_sdap_app) - ] - ) + router = RuleRouter(router_rules) log.info("Initializing on host address '%s'" % options.address) log.info("Initializing on port '%s'" % options.port)
