This is an automated email from the ASF dual-hosted git repository.
tloubrieu pushed a commit to branch SDAP-268
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/SDAP-268 by this push:
new dfa5a40 create handler manager to have multiple endpoint for the same
algorithm
dfa5a40 is described below
commit dfa5a4075a7d0283da46e4c1f1ffe40ba27cfa08
Author: thomas loubrieu <[email protected]>
AuthorDate: Tue Aug 11 13:08:39 2020 -0700
create handler manager to have multiple endpoint for the same algorithm
---
analysis/webservice/NexusHandler.py | 14 ++-
analysis/webservice/algorithms/Capabilities.py | 4 +-
.../webservice/algorithms_spark/TimeAvgMapSpark.py | 18 +---
.../webservice/algorithms_spark/TimeSeriesSpark.py | 4 +-
analysis/webservice/algorithms_spark/__init__.py | 3 +-
analysis/webservice/config/spark_pools.xml | 8 ++
.../request/handlers/NexusHandlerManager.py | 99 ++++++++++++++++++++++
.../nexus_tornado/request/handlers/__init__.py | 3 +-
analysis/webservice/webapp.py | 54 ++----------
data-access/nexustiles/config/spark_pools.xml | 0
10 files changed, 138 insertions(+), 69 deletions(-)
diff --git a/analysis/webservice/NexusHandler.py
b/analysis/webservice/NexusHandler.py
index 42972ec..e4d35d7 100644
--- a/analysis/webservice/NexusHandler.py
+++ b/analysis/webservice/NexusHandler.py
@@ -16,8 +16,11 @@
import logging
import types
+from functools import partial
-AVAILABLE_HANDLERS = []
+AVAILABLE_LEGACY_HANDLERS = []
+AVAILABLE_RESTAPI_HANDLERS = []
+AVAILABLE_WPS_HANDLERS = []
AVAILABLE_INITIALIZERS = []
@@ -32,17 +35,22 @@ def nexus_initializer(clazz):
return clazz
-def nexus_handler(clazz):
+def nexus_handler(clazz, handler_list=AVAILABLE_LEGACY_HANDLERS):
log = logging.getLogger(__name__)
try:
clazz.validate()
log.info("Adding algorithm module '%s' with path '%s' (%s)" %
(clazz.name, clazz.path, clazz))
- AVAILABLE_HANDLERS.append(clazz)
+ handler_list.append(clazz)
except Exception as ex:
log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" %
(clazz, ex.message), exc_info=True)
return clazz
+nexus_restapi_handler = partial(nexus_handler,
handler_list=AVAILABLE_RESTAPI_HANDLERS)
+nexus_wps_handler = partial(nexus_handler, handler_list=AVAILABLE_WPS_HANDLERS)
+
+
+
DEFAULT_PARAMETERS_SPEC = {
"ds": {
"name": "Dataset",
diff --git a/analysis/webservice/algorithms/Capabilities.py
b/analysis/webservice/algorithms/Capabilities.py
index fa85a7c..57d0500 100644
--- a/analysis/webservice/algorithms/Capabilities.py
+++ b/analysis/webservice/algorithms/Capabilities.py
@@ -16,7 +16,7 @@
import json
-from webservice.NexusHandler import nexus_handler, AVAILABLE_HANDLERS
+from webservice.NexusHandler import nexus_handler, AVAILABLE_LEGACY_HANDLERS
from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
from webservice.webmodel import NexusResults
@@ -32,7 +32,7 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler):
def calc(self, computeOptions, **args):
capabilities = []
- for capability in AVAILABLE_HANDLERS:
+ for capability in AVAILABLE_LEGACY_HANDLERS:
capabilityDef = {
"name": capability.name,
"path": capability.path,
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 6231873..3e3f048 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -20,7 +20,7 @@ import numpy as np
import shapely.geometry
from pytz import timezone
-from webservice.NexusHandler import nexus_handler
+from webservice.NexusHandler import nexus_handler, nexus_restapi_handler,
nexus_wps_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import
NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException,
NoDataException
@@ -29,6 +29,8 @@ ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
@nexus_handler
+@nexus_restapi_handler
+@nexus_wps_handler
class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
# __singleton_lock = threading.Lock()
# __singleton_instance = None
@@ -67,19 +69,6 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
}
singleton = True
- # @classmethod
- # def instance(cls, algorithm_config=None, sc=None):
- # with cls.__singleton_lock:
- # if not cls.__singleton_instance:
- # try:
- # singleton_instance = cls()
- # singleton_instance.set_config(algorithm_config)
- # singleton_instance.set_spark_context(sc)
- # cls.__singleton_instance = singleton_instance
- # except AttributeError:
- # pass
- # return cls.__singleton_instance
-
def parse_arguments(self, request):
# Parse input arguments
self.log.debug("Parsing arguments")
@@ -138,6 +127,7 @@ class
TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
start_time,
end_time)
+
nexus_tiles =
self._find_global_tile_set(metrics_callback=metrics_record.record_metrics)
if len(nexus_tiles) == 0:
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 43f7f6d..477c2d4 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -32,7 +32,7 @@ from nexustiles.nexustiles import NexusTileService
from pytz import timezone
from scipy import stats
from webservice import Filtering as filtering
-from webservice.NexusHandler import nexus_handler
+from webservice.NexusHandler import nexus_handler, nexus_restapi_handler,
nexus_wps_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import
NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NoDataException,
NexusProcessingException
@@ -43,6 +43,8 @@ logger = logging.getLogger(__name__)
@nexus_handler
+@nexus_restapi_handler
+@nexus_wps_handler
class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
name = "Time Series Spark"
path = "/timeSeriesSpark"
diff --git a/analysis/webservice/algorithms_spark/__init__.py
b/analysis/webservice/algorithms_spark/__init__.py
index a25c8d5..c8e3fd1 100644
--- a/analysis/webservice/algorithms_spark/__init__.py
+++ b/analysis/webservice/algorithms_spark/__init__.py
@@ -21,11 +21,10 @@ import CorrMapSpark
import DailyDifferenceAverageSpark
import HofMoellerSpark
import MaximaMinimaSpark
-import NexusCalcSparkHandler
import TimeAvgMapSpark
import TimeSeriesSpark
import VarianceSpark
-
+import NexusCalcSparkHandler
log = logging.getLogger(__name__)
diff --git a/analysis/webservice/config/spark_pools.xml
b/analysis/webservice/config/spark_pools.xml
new file mode 100644
index 0000000..50906ad
--- /dev/null
+++ b/analysis/webservice/config/spark_pools.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0"?>
+<allocations>
+ <pool name="default">
+ <schedulingMode>FAIR</schedulingMode>
+ <weight>1</weight>
+ <minShare>2</minShare>
+ </pool>
+</allocations>
\ No newline at end of file
diff --git
a/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py
b/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py
new file mode 100644
index 0000000..9f6969e
--- /dev/null
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py
@@ -0,0 +1,99 @@
+import os
+import logging
+import sys
+import importlib
+import pkg_resources
+import tornado.web
+from webservice import NexusHandler
+from webservice.nexus_tornado.request.handlers import NexusRequestHandler
+import webservice.algorithms_spark.NexusCalcSparkHandler
+
+logging.basicConfig(
+ level=logging.DEBUG,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+logger = logging.getLogger(__name__)
+
+
+class VersionHandler(tornado.web.RequestHandler):
+ def get(self):
+ self.write(pkg_resources.get_distribution("nexusanalysis").version)
+
+
+class NexusHandlerManager(object):
+ _spark_context = None
+
+ def __init__(self, module_dirs,
+ algorithm_config, tile_service_factory,
+ max_request_threads=1,
+ static_dir=None):
+
+ for moduleDir in module_dirs:
+ logger.info("Loading modules from %s" % moduleDir)
+ importlib.import_module(moduleDir)
+
+ logger.info("Running Nexus Initializers")
+ NexusHandler.executeInitializers(algorithm_config)
+
+ self._tile_service_factory = tile_service_factory
+
+ logger.info("Initializing request ThreadPool to %s" %
max_request_threads)
+ self._request_thread_pool =
tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
+
+ self._static_dir = static_dir
+
+ def get_handlers(self):
+ handlers = self._get_legacy_handlers()
+ handlers.extend(self._get_restapi_handlers())
+
+ handlers.append((r"/version", VersionHandler))
+
+ if self._static_dir:
+ handlers.append(
+ (r'/(.*)', tornado.web.StaticFileHandler, {'path':
self._static_dir, "default_filename": "index.html"}))
+
+ return handlers
+
+ def _get_legacy_handlers(self):
+ return
self.__get_tornado_handlers(NexusHandler.AVAILABLE_LEGACY_HANDLERS, lambda x: x)
+
+ def _get_restapi_handlers(self):
+
+ def path_spark_to_restapi(s):
+ i_spark = s.find('Spark')
+ return '/rest' + s[:i_spark]
+
+ return
self.__get_tornado_handlers(NexusHandler.AVAILABLE_RESTAPI_HANDLERS,
path_spark_to_restapi)
+
+ def __get_tornado_handlers(self, wrappers, path_func):
+ handlers = []
+
+ for clazzWrapper in wrappers:
+ if issubclass(clazzWrapper,
webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
+ spark_context = self._get_spark_context()
+ handlers.append((path_func(clazzWrapper.path),
+ NexusRequestHandler,
+ dict(clazz=clazzWrapper,
+
tile_service_factory=self._tile_service_factory,
+ sc=spark_context,
+ thread_pool=self._request_thread_pool)))
+ else:
+ handlers.append((path_func(clazzWrapper.path),
+ NexusRequestHandler,
+ dict(clazz=clazzWrapper,
+
tile_service_factory=self._tile_service_factory,
+ thread_pool=self._request_thread_pool)))
+
+ return handlers
+
+ def _get_spark_context(self):
+ if self._spark_context is None:
+ from pyspark.sql import SparkSession
+
+ spark = SparkSession.builder.appName("nexus-analysis") \
+ .config("spark.scheduler.mode", "FAIR") \
+ .config("spark.scheduler.allocation.file",
os.path.abspath("webservice/config/spark_pools.xml")) \
+ .getOrCreate()
+ self._spark_context = spark.sparkContext
+
+ return self._spark_context
diff --git a/analysis/webservice/nexus_tornado/request/handlers/__init__.py
b/analysis/webservice/nexus_tornado/request/handlers/__init__.py
index 7c6b1f4..c62028d 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/__init__.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/__init__.py
@@ -1 +1,2 @@
-from .NexusRequestHandler import NexusRequestHandler
\ No newline at end of file
+from .NexusRequestHandler import NexusRequestHandler
+from .NexusHandlerManager import NexusHandlerManager
\ No newline at end of file
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index c74972d..b8ee1ab 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -17,6 +17,7 @@ import ConfigParser
import importlib
import logging
import sys
+import os
from functools import partial
import pkg_resources
@@ -27,6 +28,7 @@ import webservice.algorithms_spark.NexusCalcSparkHandler
from nexustiles.nexustiles import NexusTileService
from webservice import NexusHandler
from webservice.nexus_tornado.request.handlers import NexusRequestHandler
+from nexus_tornado.request.handlers import NexusHandlerManager
def inject_args_in_config(args, config):
@@ -78,10 +80,7 @@ if __name__ == "__main__":
parse_command_line()
algorithm_config = inject_args_in_config(options, algorithm_config)
- moduleDirs = webconfig.get("modules", "module_dirs").split(",")
- for moduleDir in moduleDirs:
- log.info("Loading modules from %s" % moduleDir)
- importlib.import_module(moduleDir)
+ module_dirs = webconfig.get("modules", "module_dirs").split(",")
staticDir = webconfig.get("static", "static_dir")
staticEnabled = webconfig.get("static", "static_enabled") == "true"
@@ -94,52 +93,15 @@ if __name__ == "__main__":
else:
log.info("Static resources disabled")
- handlers = []
-
- log.info("Running Nexus Initializers")
- NexusHandler.executeInitializers(algorithm_config)
-
max_request_threads = webconfig.getint("global",
"server.max_simultaneous_requests")
- log.info("Initializing request ThreadPool to %s" % max_request_threads)
- request_thread_pool =
tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
tile_service_factory = partial(NexusTileService, False, False,
algorithm_config)
- spark_context = None
- for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
- if issubclass(clazzWrapper,
webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
- if spark_context is None:
- from pyspark.sql import SparkSession
-
- spark = SparkSession.builder.appName("nexus-analysis")\
- .config("spark.scheduler.mode", "FAIR")\
- .config("spark.scheduler.allocation.file",
"config/spark_pools.xml"))
- .getOrCreate()
- spark_context = spark.sparkContext
-
- handlers.append((clazzWrapper.path,
- NexusRequestHandler,
- dict(clazz=clazzWrapper,
- tile_service_factory=tile_service_factory,
- sc=spark_context,
- thread_pool=request_thread_pool)))
- else:
- handlers.append((clazzWrapper.path,
- NexusRequestHandler,
- dict(clazz=clazzWrapper,
- tile_service_factory=tile_service_factory,
- thread_pool=request_thread_pool)))
-
-
- class VersionHandler(tornado.web.RequestHandler):
- def get(self):
- self.write(pkg_resources.get_distribution("nexusanalysis").version)
-
-
- handlers.append((r"/version", VersionHandler))
- if staticEnabled:
- handlers.append(
- (r'/(.*)', tornado.web.StaticFileHandler, {'path': staticDir,
"default_filename": "index.html"}))
+ nexus_handler_manager = NexusHandlerManager(module_dirs,
+ algorithm_config,
tile_service_factory,
+
max_request_threads=max_request_threads,
+ static_dir=staticDir)
+ handlers = nexus_handler_manager.get_handlers()
app = tornado.web.Application(
handlers,
diff --git a/data-access/nexustiles/config/spark_pools.xml
b/data-access/nexustiles/config/spark_pools.xml
deleted file mode 100644
index e69de29..0000000