This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch SDAP-268
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/SDAP-268 by this push:
     new dfa5a40  create handler manager to have multiple endpoint for the same 
algorithm
dfa5a40 is described below

commit dfa5a4075a7d0283da46e4c1f1ffe40ba27cfa08
Author: thomas loubrieu <[email protected]>
AuthorDate: Tue Aug 11 13:08:39 2020 -0700

    create handler manager to have multiple endpoint for the same algorithm
---
 analysis/webservice/NexusHandler.py                | 14 ++-
 analysis/webservice/algorithms/Capabilities.py     |  4 +-
 .../webservice/algorithms_spark/TimeAvgMapSpark.py | 18 +---
 .../webservice/algorithms_spark/TimeSeriesSpark.py |  4 +-
 analysis/webservice/algorithms_spark/__init__.py   |  3 +-
 analysis/webservice/config/spark_pools.xml         |  8 ++
 .../request/handlers/NexusHandlerManager.py        | 99 ++++++++++++++++++++++
 .../nexus_tornado/request/handlers/__init__.py     |  3 +-
 analysis/webservice/webapp.py                      | 54 ++----------
 data-access/nexustiles/config/spark_pools.xml      |  0
 10 files changed, 138 insertions(+), 69 deletions(-)

diff --git a/analysis/webservice/NexusHandler.py 
b/analysis/webservice/NexusHandler.py
index 42972ec..e4d35d7 100644
--- a/analysis/webservice/NexusHandler.py
+++ b/analysis/webservice/NexusHandler.py
@@ -16,8 +16,11 @@
 
 import logging
 import types
+from functools import partial
 
-AVAILABLE_HANDLERS = []
+AVAILABLE_LEGACY_HANDLERS = []
+AVAILABLE_RESTAPI_HANDLERS = []
+AVAILABLE_WPS_HANDLERS = []
 AVAILABLE_INITIALIZERS = []
 
 
@@ -32,17 +35,22 @@ def nexus_initializer(clazz):
     return clazz
 
 
-def nexus_handler(clazz):
+def nexus_handler(clazz, handler_list=AVAILABLE_LEGACY_HANDLERS):
     log = logging.getLogger(__name__)
     try:
         clazz.validate()
         log.info("Adding algorithm module '%s' with path '%s' (%s)" % 
(clazz.name, clazz.path, clazz))
-        AVAILABLE_HANDLERS.append(clazz)
+        handler_list.append(clazz)
     except Exception as ex:
         log.warn("Handler '%s' is invalid and will be skipped (reason: %s)" % 
(clazz, ex.message), exc_info=True)
     return clazz
 
 
+nexus_restapi_handler = partial(nexus_handler, 
handler_list=AVAILABLE_RESTAPI_HANDLERS)
+nexus_wps_handler = partial(nexus_handler, handler_list=AVAILABLE_WPS_HANDLERS)
+
+
+
 DEFAULT_PARAMETERS_SPEC = {
     "ds": {
         "name": "Dataset",
diff --git a/analysis/webservice/algorithms/Capabilities.py 
b/analysis/webservice/algorithms/Capabilities.py
index fa85a7c..57d0500 100644
--- a/analysis/webservice/algorithms/Capabilities.py
+++ b/analysis/webservice/algorithms/Capabilities.py
@@ -16,7 +16,7 @@
 
 import json
 
-from webservice.NexusHandler import nexus_handler, AVAILABLE_HANDLERS
+from webservice.NexusHandler import nexus_handler, AVAILABLE_LEGACY_HANDLERS
 from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
 from webservice.webmodel import NexusResults
 
@@ -32,7 +32,7 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler):
     def calc(self, computeOptions, **args):
         capabilities = []
 
-        for capability in AVAILABLE_HANDLERS:
+        for capability in AVAILABLE_LEGACY_HANDLERS:
             capabilityDef = {
                 "name": capability.name,
                 "path": capability.path,
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py 
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 6231873..3e3f048 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -20,7 +20,7 @@ import numpy as np
 import shapely.geometry
 from pytz import timezone
 
-from webservice.NexusHandler import nexus_handler
+from webservice.NexusHandler import nexus_handler, nexus_restapi_handler, 
nexus_wps_handler
 from webservice.algorithms_spark.NexusCalcSparkHandler import 
NexusCalcSparkHandler
 from webservice.webmodel import NexusResults, NexusProcessingException, 
NoDataException
 
@@ -29,6 +29,8 @@ ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
 
 
 @nexus_handler
+@nexus_restapi_handler
+@nexus_wps_handler
 class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
     # __singleton_lock = threading.Lock()
     # __singleton_instance = None
@@ -67,19 +69,6 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
     }
     singleton = True
 
-    # @classmethod
-    # def instance(cls, algorithm_config=None, sc=None):
-    #     with cls.__singleton_lock:
-    #         if not cls.__singleton_instance:
-    #             try:
-    #                 singleton_instance = cls()
-    #                 singleton_instance.set_config(algorithm_config)
-    #                 singleton_instance.set_spark_context(sc)
-    #                 cls.__singleton_instance = singleton_instance
-    #             except AttributeError:
-    #                 pass
-    #     return cls.__singleton_instance
-
     def parse_arguments(self, request):
         # Parse input arguments
         self.log.debug("Parsing arguments")
@@ -138,6 +127,7 @@ class 
TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler):
                              start_time,
                              end_time)
 
+
         nexus_tiles = 
self._find_global_tile_set(metrics_callback=metrics_record.record_metrics)
 
         if len(nexus_tiles) == 0:
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py 
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index 43f7f6d..477c2d4 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -32,7 +32,7 @@ from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
 from scipy import stats
 from webservice import Filtering as filtering
-from webservice.NexusHandler import nexus_handler
+from webservice.NexusHandler import nexus_handler, nexus_restapi_handler, 
nexus_wps_handler
 from webservice.algorithms_spark.NexusCalcSparkHandler import 
NexusCalcSparkHandler
 from webservice.webmodel import NexusResults, NoDataException, 
NexusProcessingException
 
@@ -43,6 +43,8 @@ logger = logging.getLogger(__name__)
 
 
 @nexus_handler
+@nexus_restapi_handler
+@nexus_wps_handler
 class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler):
     name = "Time Series Spark"
     path = "/timeSeriesSpark"
diff --git a/analysis/webservice/algorithms_spark/__init__.py 
b/analysis/webservice/algorithms_spark/__init__.py
index a25c8d5..c8e3fd1 100644
--- a/analysis/webservice/algorithms_spark/__init__.py
+++ b/analysis/webservice/algorithms_spark/__init__.py
@@ -21,11 +21,10 @@ import CorrMapSpark
 import DailyDifferenceAverageSpark
 import HofMoellerSpark
 import MaximaMinimaSpark
-import NexusCalcSparkHandler
 import TimeAvgMapSpark
 import TimeSeriesSpark
 import VarianceSpark
-
+import NexusCalcSparkHandler
 
 log = logging.getLogger(__name__)
 
diff --git a/analysis/webservice/config/spark_pools.xml 
b/analysis/webservice/config/spark_pools.xml
new file mode 100644
index 0000000..50906ad
--- /dev/null
+++ b/analysis/webservice/config/spark_pools.xml
@@ -0,0 +1,8 @@
+<?xml version="1.0"?>
+<allocations>
+  <pool name="default">
+    <schedulingMode>FAIR</schedulingMode>
+    <weight>1</weight>
+    <minShare>2</minShare>
+  </pool>
+</allocations>
\ No newline at end of file
diff --git 
a/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py 
b/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py
new file mode 100644
index 0000000..9f6969e
--- /dev/null
+++ b/analysis/webservice/nexus_tornado/request/handlers/NexusHandlerManager.py
@@ -0,0 +1,99 @@
+import os
+import logging
+import sys
+import importlib
+import pkg_resources
+import tornado.web
+from webservice import NexusHandler
+from webservice.nexus_tornado.request.handlers import NexusRequestHandler
+import webservice.algorithms_spark.NexusCalcSparkHandler
+
+logging.basicConfig(
+    level=logging.DEBUG,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+logger = logging.getLogger(__name__)
+
+
+class VersionHandler(tornado.web.RequestHandler):
+    def get(self):
+        self.write(pkg_resources.get_distribution("nexusanalysis").version)
+
+
+class NexusHandlerManager(object):
+    _spark_context = None
+
+    def __init__(self, module_dirs,
+                 algorithm_config, tile_service_factory,
+                 max_request_threads=1,
+                 static_dir=None):
+
+        for moduleDir in module_dirs:
+            logger.info("Loading modules from %s" % moduleDir)
+            importlib.import_module(moduleDir)
+
+        logger.info("Running Nexus Initializers")
+        NexusHandler.executeInitializers(algorithm_config)
+
+        self._tile_service_factory = tile_service_factory
+
+        logger.info("Initializing request ThreadPool to %s" % 
max_request_threads)
+        self._request_thread_pool = 
tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
+
+        self._static_dir = static_dir
+
+    def get_handlers(self):
+        handlers = self._get_legacy_handlers()
+        handlers.extend(self._get_restapi_handlers())
+
+        handlers.append((r"/version", VersionHandler))
+
+        if self._static_dir:
+            handlers.append(
+                (r'/(.*)', tornado.web.StaticFileHandler, {'path': 
self._static_dir, "default_filename": "index.html"}))
+
+        return handlers
+
+    def _get_legacy_handlers(self):
+        return 
self.__get_tornado_handlers(NexusHandler.AVAILABLE_LEGACY_HANDLERS, lambda x: x)
+
+    def _get_restapi_handlers(self):
+
+        def path_spark_to_restapi(s):
+            i_spark = s.find('Spark')
+            return '/rest' + s[:i_spark]
+
+        return 
self.__get_tornado_handlers(NexusHandler.AVAILABLE_RESTAPI_HANDLERS, 
path_spark_to_restapi)
+
+    def __get_tornado_handlers(self, wrappers, path_func):
+        handlers = []
+
+        for clazzWrapper in wrappers:
+            if issubclass(clazzWrapper, 
webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
+                spark_context = self._get_spark_context()
+                handlers.append((path_func(clazzWrapper.path),
+                                 NexusRequestHandler,
+                                 dict(clazz=clazzWrapper,
+                                      
tile_service_factory=self._tile_service_factory,
+                                      sc=spark_context,
+                                      thread_pool=self._request_thread_pool)))
+            else:
+                handlers.append((path_func(clazzWrapper.path),
+                                 NexusRequestHandler,
+                                 dict(clazz=clazzWrapper,
+                                      
tile_service_factory=self._tile_service_factory,
+                                      thread_pool=self._request_thread_pool)))
+
+        return handlers
+
+    def _get_spark_context(self):
+        if self._spark_context is None:
+            from pyspark.sql import SparkSession
+
+            spark = SparkSession.builder.appName("nexus-analysis") \
+                .config("spark.scheduler.mode", "FAIR") \
+                .config("spark.scheduler.allocation.file", 
os.path.abspath("webservice/config/spark_pools.xml")) \
+                .getOrCreate()
+            self._spark_context = spark.sparkContext
+
+        return self._spark_context
diff --git a/analysis/webservice/nexus_tornado/request/handlers/__init__.py 
b/analysis/webservice/nexus_tornado/request/handlers/__init__.py
index 7c6b1f4..c62028d 100644
--- a/analysis/webservice/nexus_tornado/request/handlers/__init__.py
+++ b/analysis/webservice/nexus_tornado/request/handlers/__init__.py
@@ -1 +1,2 @@
-from .NexusRequestHandler import NexusRequestHandler
\ No newline at end of file
+from .NexusRequestHandler import NexusRequestHandler
+from .NexusHandlerManager import NexusHandlerManager
\ No newline at end of file
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index c74972d..b8ee1ab 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -17,6 +17,7 @@ import ConfigParser
 import importlib
 import logging
 import sys
+import os
 from functools import partial
 
 import pkg_resources
@@ -27,6 +28,7 @@ import webservice.algorithms_spark.NexusCalcSparkHandler
 from nexustiles.nexustiles import NexusTileService
 from webservice import NexusHandler
 from webservice.nexus_tornado.request.handlers import NexusRequestHandler
+from nexus_tornado.request.handlers import NexusHandlerManager
 
 
 def inject_args_in_config(args, config):
@@ -78,10 +80,7 @@ if __name__ == "__main__":
     parse_command_line()
     algorithm_config = inject_args_in_config(options, algorithm_config)
 
-    moduleDirs = webconfig.get("modules", "module_dirs").split(",")
-    for moduleDir in moduleDirs:
-        log.info("Loading modules from %s" % moduleDir)
-        importlib.import_module(moduleDir)
+    module_dirs = webconfig.get("modules", "module_dirs").split(",")
 
     staticDir = webconfig.get("static", "static_dir")
     staticEnabled = webconfig.get("static", "static_enabled") == "true"
@@ -94,52 +93,15 @@ if __name__ == "__main__":
     else:
         log.info("Static resources disabled")
 
-    handlers = []
-
-    log.info("Running Nexus Initializers")
-    NexusHandler.executeInitializers(algorithm_config)
-
     max_request_threads = webconfig.getint("global", 
"server.max_simultaneous_requests")
-    log.info("Initializing request ThreadPool to %s" % max_request_threads)
-    request_thread_pool = 
tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads)
 
     tile_service_factory = partial(NexusTileService, False, False, 
algorithm_config)
-    spark_context = None
-    for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
-        if issubclass(clazzWrapper, 
webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler):
-            if spark_context is None:
-                from pyspark.sql import SparkSession
-
-                spark = SparkSession.builder.appName("nexus-analysis")\
-                    .config("spark.scheduler.mode", "FAIR")\
-                    .config("spark.scheduler.allocation.file", 
"config/spark_pools.xml"))
-                    .getOrCreate()
-                spark_context = spark.sparkContext
-
-            handlers.append((clazzWrapper.path,
-                             NexusRequestHandler,
-                             dict(clazz=clazzWrapper,
-                                  tile_service_factory=tile_service_factory,
-                                  sc=spark_context,
-                                  thread_pool=request_thread_pool)))
-        else:
-            handlers.append((clazzWrapper.path,
-                             NexusRequestHandler,
-                             dict(clazz=clazzWrapper,
-                                  tile_service_factory=tile_service_factory,
-                                  thread_pool=request_thread_pool)))
-
-
-    class VersionHandler(tornado.web.RequestHandler):
-        def get(self):
-            self.write(pkg_resources.get_distribution("nexusanalysis").version)
-
-
-    handlers.append((r"/version", VersionHandler))
 
-    if staticEnabled:
-        handlers.append(
-            (r'/(.*)', tornado.web.StaticFileHandler, {'path': staticDir, 
"default_filename": "index.html"}))
+    nexus_handler_manager = NexusHandlerManager(module_dirs,
+                                                algorithm_config, 
tile_service_factory,
+                                                
max_request_threads=max_request_threads,
+                                                static_dir=staticDir)
+    handlers = nexus_handler_manager.get_handlers()
 
     app = tornado.web.Application(
         handlers,
diff --git a/data-access/nexustiles/config/spark_pools.xml 
b/data-access/nexustiles/config/spark_pools.xml
deleted file mode 100644
index e69de29..0000000

Reply via email to