This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new f72d429 Add metadata to remote coll (#173)
f72d429 is described below
commit f72d429c189f2b6597d324d4d6d4180a8cc31bf7
Author: thomas loubrieu <[email protected]>
AuthorDate: Wed Jul 27 15:03:29 2022 -0700
Add metadata to remote coll (#173)
* fix bugs in helm chart after update for proxy feature
* update CHANGELOG for ticket 388 (proxy)
* makes --collection-path optional, add remote collection cache for list
metadata
* add unit tests for remote collection cache
* add versions to avoid build failure in docker
* make code more robust
Co-authored-by: tloubrieu-jpl <[email protected]>
Co-authored-by: thomas loubrieu <[email protected]>
---
analysis/tests/redirect/test_RemoteSDAPCache.py | 102 +++++++++++++++++++++
analysis/webservice/algorithms/DataSeriesList.py | 34 +++++--
.../app_builders/HandlerArgsBuilder.py | 12 ++-
.../nexus_tornado/app_builders/NexusAppBuilder.py | 4 +-
analysis/webservice/redirect/RedirectHandler.py | 1 +
analysis/webservice/redirect/RemoteSDAPCache.py | 48 ++++++++++
analysis/webservice/redirect/__init__.py | 4 +-
analysis/webservice/webapp.py | 27 +++---
data-access/requirements.txt | 3 +-
9 files changed, 206 insertions(+), 29 deletions(-)
diff --git a/analysis/tests/redirect/test_RemoteSDAPCache.py
b/analysis/tests/redirect/test_RemoteSDAPCache.py
new file mode 100644
index 0000000..7a40e2f
--- /dev/null
+++ b/analysis/tests/redirect/test_RemoteSDAPCache.py
@@ -0,0 +1,102 @@
+import unittest
+from unittest import mock
+import requests
+from datetime import datetime
+from datetime import timedelta
+
+from webservice.redirect import RemoteSDAPCache
+from webservice.redirect import CollectionNotFound
+from webservice.redirect.RemoteSDAPCache import RemoteSDAPList
+
+class MockResponse:
+ def __init__(self, json_data, status_code):
+ self.json_data = json_data
+ self.status_code = status_code
+
+ def json(self):
+ return self.json_data
+
+LIST_CONTENT = [
+ {
+ "shortName": "PM25",
+ "title": "PM25",
+ "tileCount": 21515,
+ "start": 1514818800.0,
+ "end": 1640991600.0,
+ "iso_start": "2018-01-01T15:00:00+0000",
+ "iso_end": "2021-12-31T23:00:00+0000"
+ }
+ ]
+
+LIST_CONTENT_FORMER = [LIST_CONTENT[0].copy()]
+LIST_CONTENT_FORMER[0]['start'] = 0
+
+def mocked_requests_get(*asgs, **kwargs):
+ return MockResponse(LIST_CONTENT, 200)
+
+
+def mocked_requests_get_timeout(*asgs, **kwargs):
+ raise requests.exceptions.ConnectTimeout()
+
+
+def mocked_requests_get_not_found(*asgs, **kwargs):
+ return MockResponse({}, 404)
+
+
+
+class MyTestCase(unittest.TestCase):
+
+ @mock.patch('requests.get', side_effect=mocked_requests_get)
+ def test_get(self, mock_get):
+ remote_sdap_cache = RemoteSDAPCache()
+
+ collection =
remote_sdap_cache.get('https://aq-sdap.stcenter.net/nexus/', 'PM25')
+ self.assertEqual(collection["start"], 1514818800.0)
+
+ @mock.patch('requests.get', side_effect=mocked_requests_get_timeout)
+ def test_get_timeout(self, mock_get):
+ remote_sdap_cache = RemoteSDAPCache()
+ with self.assertRaises(CollectionNotFound):
+ remote_sdap_cache.get('https://aq-sdap.stcenter.net/nexus/',
'PM25')
+
+
+ @mock.patch('requests.get', side_effect=mocked_requests_get_not_found)
+ def test_get_not_found(self, mock_get):
+ remote_sdap_cache = RemoteSDAPCache()
+ with self.assertRaises(CollectionNotFound):
+ remote_sdap_cache.get('https://aq-sdap.stcenter.net/nexus/',
'PM25')
+
+ @mock.patch('requests.get', side_effect=mocked_requests_get)
+ def test_get_expired(self, mock_get):
+ remote_sdap_cache = RemoteSDAPCache()
+
+ remote_sdap_cache.sdap_lists['https://aq-sdap.stcenter.net/nexus/'] =
RemoteSDAPList(
+ list=LIST_CONTENT_FORMER,
+ outdated_at=datetime.now() - timedelta(seconds=3600*25)
+ )
+
+ collection =
remote_sdap_cache.get('https://aq-sdap.stcenter.net/nexus/', 'PM25')
+
+ # check requests.get is called once
+ self.assertEqual(mock_get.call_count, 1)
+ self.assertEqual(collection["start"], 1514818800.0)
+
+ @mock.patch('requests.get', side_effect=mocked_requests_get)
+ def test_get_cached_valid(self, mock_get):
+ remote_sdap_cache = RemoteSDAPCache()
+
+ remote_sdap_cache.sdap_lists['https://aq-sdap.stcenter.net/nexus'] =
RemoteSDAPList(
+ list=LIST_CONTENT_FORMER,
+ outdated_at=datetime.now() - timedelta(seconds=3600 * 23)
+ )
+
+ collection =
remote_sdap_cache.get('https://aq-sdap.stcenter.net/nexus/', 'PM25')
+
+ # check requests.get is called once
+ self.assertEqual(mock_get.call_count, 0)
+ self.assertEqual(collection["start"], 0)
+
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/analysis/webservice/algorithms/DataSeriesList.py
b/analysis/webservice/algorithms/DataSeriesList.py
index e79df4e..e247bb6 100644
--- a/analysis/webservice/algorithms/DataSeriesList.py
+++ b/analysis/webservice/algorithms/DataSeriesList.py
@@ -17,6 +17,8 @@
import json
from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
+from webservice.redirect import RemoteSDAPCache
+from webservice.redirect import CollectionNotFound
from webservice.NexusHandler import nexus_handler
from webservice.webmodel import cached
@@ -31,8 +33,9 @@ class DataSeriesListCalcHandlerImpl(NexusCalcHandler):
path = "/list"
description = "Lists datasets currently available for analysis"
params = {}
+ remote_sdaps = RemoteSDAPCache()
- def __init__(self, tile_service_factory, remote_collections, **kwargs):
+ def __init__(self, tile_service_factory, remote_collections=None,
**kwargs):
super().__init__(tile_service_factory, **kwargs)
self._remote_collections = remote_collections
@@ -49,13 +52,26 @@ class DataSeriesListCalcHandlerImpl(NexusCalcHandler):
collection_list = self._get_tile_service().get_dataseries_list()
# add remote collections
- for collection in self._remote_collections.values():
- collection_list.append(
- {
- "shortName": collection["id"],
- "remoteUrl": collection["path"],
- "remoteShortName": collection["remote_id"] if 'remote_id'
in collection else collection["id"]
- }
- )
+ if self._remote_collections:
+ for collection in self._remote_collections.values():
+
+ current_collection = {
+ "shortName": collection["id"],
+ "remoteUrl": collection["path"],
+ "remoteShortName": collection["remote_id"] if
'remote_id' in collection else collection["id"]
+ }
+
+ try:
+ remote_collection = self.remote_sdaps.get(
+ collection["path"],
+ current_collection["remoteShortName"]
+ )
+ del remote_collection['shortName']
+ current_collection.update(remote_collection)
+
+ except CollectionNotFound as e:
+ logger.warning(e)
+ finally:
+ collection_list.append(current_collection)
return SimpleResult(collection_list)
diff --git
a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
index ffcca13..e3edbaf 100644
--- a/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/HandlerArgsBuilder.py
@@ -4,7 +4,13 @@ from .SparkContextBuilder import SparkContextBuilder
class HandlerArgsBuilder:
- def __init__(self, max_request_threads, tile_service_factory,
algorithm_config, remote_collections):
+ def __init__(
+ self,
+ max_request_threads,
+ tile_service_factory,
+ algorithm_config,
+ remote_collections=None
+ ):
self.request_thread_pool =
tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
self.tile_service_factory = tile_service_factory
self.algorithm_config = algorithm_config
@@ -22,7 +28,7 @@ class HandlerArgsBuilder:
@staticmethod
def handler_needs_remote_collections(class_wrapper):
- return class_wrapper == webservice.algorithms.DataSeriesList.D
+ return class_wrapper ==
webservice.algorithms.DataSeriesList.DataSeriesListCalcHandlerImpl
def get_args(self, clazz_wrapper):
args = dict(
@@ -37,7 +43,7 @@ class HandlerArgsBuilder:
if self.handler_needs_algorithm_config(clazz_wrapper):
args['config'] = self.algorithm_config
- if clazz_wrapper ==
webservice.algorithms.DataSeriesList.DataSeriesListCalcHandlerImpl:
+ if self.handler_needs_remote_collections(clazz_wrapper):
args['remote_collections'] = self.remote_collections
return args
diff --git a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
index 20eb335..80f1c92 100644
--- a/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
+++ b/analysis/webservice/nexus_tornado/app_builders/NexusAppBuilder.py
@@ -30,7 +30,7 @@ class NexusAppBuilder:
r'/apidocs/(.*)', tornado.web.StaticFileHandler,
{'path': str(apidocs_path), "default_filename": "index.html"}))
- def set_modules(self, module_dir, algorithm_config, remote_collections,
max_request_threads=4):
+ def set_modules(self, module_dir, algorithm_config,
remote_collections=None, max_request_threads=4):
for moduleDir in module_dir:
self.log.info("Loading modules from %s" % moduleDir)
importlib.import_module(moduleDir)
@@ -44,7 +44,7 @@ class NexusAppBuilder:
max_request_threads,
tile_service_factory,
algorithm_config,
- remote_collections
+ remote_collections=remote_collections
)
for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
diff --git a/analysis/webservice/redirect/RedirectHandler.py
b/analysis/webservice/redirect/RedirectHandler.py
index 7ffda43..58b5aff 100644
--- a/analysis/webservice/redirect/RedirectHandler.py
+++ b/analysis/webservice/redirect/RedirectHandler.py
@@ -4,6 +4,7 @@ from webservice.webmodel.RequestParameters import
RequestParameters
logger = logging.getLogger(__name__)
+
class RedirectHandler(tornado.web.RequestHandler):
def initialize(self, redirected_collections=None):
diff --git a/analysis/webservice/redirect/RemoteSDAPCache.py
b/analysis/webservice/redirect/RemoteSDAPCache.py
new file mode 100644
index 0000000..2568ab1
--- /dev/null
+++ b/analysis/webservice/redirect/RemoteSDAPCache.py
@@ -0,0 +1,48 @@
+import requests
+import logging
+from datetime import datetime
+from datetime import timedelta
+from dataclasses import dataclass
+
+logger = logging.getLogger(__name__)
+
+@dataclass
+class RemoteSDAPList:
+ list: dict
+ outdated_at: datetime
+
+
+class CollectionNotFound(Exception):
+ pass
+
+
+class RemoteSDAPCache:
+ def __init__(self):
+ self.sdap_lists = {}
+
+ def _add(self, url, timeout=2, max_age=3600*24):
+ list_url = f"{url}/list"
+ try:
+ r = requests.get(list_url, timeout=timeout)
+ if r.status_code == 200:
+ logger.info("Caching list for sdap %s: %s", list_url, r.text)
+ self.sdap_lists[url] = RemoteSDAPList(
+ list=r.json(),
+ outdated_at=datetime.now()+timedelta(seconds=max_age)
+ )
+ else:
+ raise CollectionNotFound("url %s was not reachable, responded
with status %s", list_url, r.status_code)
+ except requests.exceptions.ConnectTimeout as e:
+ raise CollectionNotFound("url %s was not reachable in %i s",
list_url, timeout)
+
+ def get(self, url, short_name):
+ stripped_url = url.strip('/')
+ if stripped_url not in self.sdap_lists or
self.sdap_lists[stripped_url].outdated_at>datetime.now():
+ self._add(stripped_url)
+
+ for collection in self.sdap_lists[stripped_url].list:
+ if 'shortName' in collection and collection['shortName'] ==
short_name:
+ return collection
+
+ raise CollectionNotFound("collection %s has not been found in url %s",
short_name, stripped_url)
+
diff --git a/analysis/webservice/redirect/__init__.py
b/analysis/webservice/redirect/__init__.py
index 201e7a2..be5fe45 100644
--- a/analysis/webservice/redirect/__init__.py
+++ b/analysis/webservice/redirect/__init__.py
@@ -1,2 +1,4 @@
from .RedirectHandler import RedirectHandler
-from .RemoteCollectionMatcher import RemoteCollectionMatcher
\ No newline at end of file
+from .RemoteCollectionMatcher import RemoteCollectionMatcher
+from .RemoteSDAPCache import RemoteSDAPCache
+from .RemoteSDAPCache import CollectionNotFound
\ No newline at end of file
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index 8db6459..e060d3c 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -74,18 +74,27 @@ def main():
define('cassandra_host', help='cassandra host')
define('cassandra_username', help='cassandra username')
define('cassandra_password', help='cassandra password')
- define('collections_path', help='collection config path')
+ define('collections_path', default=None, help='collection config path')
parse_command_line()
algorithm_config = inject_args_in_config(options, algorithm_config)
- remote_collection_matcher =
RemoteCollectionMatcher(options.collections_path)
+ remote_collections = None
+ router_rules = []
+ if options.collections_path:
+ # build retirect app
+ remote_collection_matcher =
RemoteCollectionMatcher(options.collections_path)
+ remote_collections = remote_collection_matcher.get_remote_collections()
+ remote_sdap_app = RedirectAppBuilder(remote_collection_matcher).build(
+ host=options.address,
+ debug=options.debug)
+ router_rules.append(Rule(remote_collection_matcher, remote_sdap_app))
# build nexus app
nexus_app_builder = NexusAppBuilder().set_modules(
web_config.get("modules", "module_dirs").split(","),
algorithm_config,
- remote_collection_matcher.get_remote_collections()
+ remote_collections=remote_collections
)
if web_config.get("static", "static_enabled") == "true":
@@ -96,17 +105,9 @@ def main():
log.info("Static resources disabled")
local_sdap_app = nexus_app_builder.build(host=options.address,
debug=options.debug)
+ router_rules.append(Rule(AnyMatches(), local_sdap_app))
- # build redirect app
- remote_sdap_app = RedirectAppBuilder(remote_collection_matcher).build(
- host=options.address,
- debug=options.debug)
-
- router = RuleRouter([
- Rule(remote_collection_matcher, remote_sdap_app),
- Rule(AnyMatches(), local_sdap_app)
- ]
- )
+ router = RuleRouter(router_rules)
log.info("Initializing on host address '%s'" % options.address)
log.info("Initializing on port '%s'" % options.port)
diff --git a/data-access/requirements.txt b/data-access/requirements.txt
index 74a944b..b9345f3 100644
--- a/data-access/requirements.txt
+++ b/data-access/requirements.txt
@@ -1,6 +1,7 @@
cassandra-driver==3.24.0
pysolr==3.9.0
-elasticsearch
+elasticsearch==8.3.1
+urllib3==1.26.2
requests
nexusproto
Shapely