fgreg closed pull request #55: SDAP-164 Refactor Webapp
URL: https://github.com/apache/incubator-sdap-nexus/pull/55
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/analysis/webservice/NexusHandler.py 
b/analysis/webservice/NexusHandler.py
index eb6373f..89391b8 100644
--- a/analysis/webservice/NexusHandler.py
+++ b/analysis/webservice/NexusHandler.py
@@ -22,8 +22,6 @@
 from netCDF4 import Dataset
 from nexustiles.nexustiles import NexusTileService
 
-from webservice.webmodel import NexusProcessingException
-
 AVAILABLE_HANDLERS = []
 AVAILABLE_INITIALIZERS = []
 
@@ -162,7 +160,7 @@ def description(self):
     def params(self):
         return self.__clazz.params
 
-    def instance(self, algorithm_config=None, sc=None):
+    def instance(self, algorithm_config=None):
         if "singleton" in self.__clazz.__dict__ and 
self.__clazz.__dict__["singleton"] is True:
             if self.__instance is None:
                 self.__instance = self.__clazz()
@@ -172,11 +170,6 @@ def instance(self, algorithm_config=None, sc=None):
                 except AttributeError:
                     pass
 
-                try:
-                    self.__instance.set_spark_context(sc)
-                except AttributeError:
-                    pass
-
             return self.__instance
         else:
             instance = self.__clazz()
@@ -186,10 +179,6 @@ def instance(self, algorithm_config=None, sc=None):
             except AttributeError:
                 pass
 
-            try:
-                self.__instance.set_spark_context(sc)
-            except AttributeError:
-                pass
             return instance
 
     def isValid(self):
@@ -257,65 +246,15 @@ def _mergeResults(self, resultsRaw):
 
 
 class SparkHandler(NexusHandler):
-    class SparkJobContext(object):
-
-        class MaxConcurrentJobsReached(Exception):
-            def __init__(self, *args, **kwargs):
-                Exception.__init__(self, *args, **kwargs)
-
-        def __init__(self, job_stack):
-            self.spark_job_stack = job_stack
-            self.job_name = None
-            self.log = logging.getLogger(__name__)
-
-        def __enter__(self):
-            try:
-                self.job_name = self.spark_job_stack.pop()
-                self.log.debug("Using %s" % self.job_name)
-            except IndexError:
-                raise SparkHandler.SparkJobContext.MaxConcurrentJobsReached()
-            return self
-
-        def __exit__(self, exc_type, exc_val, exc_tb):
-            if self.job_name is not None:
-                self.log.debug("Returning %s" % self.job_name)
-                self.spark_job_stack.append(self.job_name)
 
     def __init__(self, **kwargs):
-        import inspect
         NexusHandler.__init__(self, **kwargs)
         self._sc = None
 
-        self.spark_job_stack = []
-
-        def with_spark_job_context(calc_func):
-            from functools import wraps
-
-            @wraps(calc_func)
-            def wrapped(*args, **kwargs1):
-                try:
-                    with SparkHandler.SparkJobContext(self.spark_job_stack) as 
job_context:
-                        # TODO Pool and Job are forced to a 1-to-1 relationship
-                        
calc_func.im_self._sc.setLocalProperty("spark.scheduler.pool", 
job_context.job_name)
-                        
calc_func.im_self._sc.setJobGroup(job_context.job_name, "a spark job")
-                        return calc_func(*args, **kwargs1)
-                except SparkHandler.SparkJobContext.MaxConcurrentJobsReached:
-                    raise NexusProcessingException(code=503,
-                                                   reason="Max concurrent 
requests reached. Please try again later.")
-
-            return wrapped
-
-        for member in inspect.getmembers(self, predicate=inspect.ismethod):
-            if member[0] == "calc":
-                setattr(self, member[0], with_spark_job_context(member[1]))
-
     def set_spark_context(self, sc):
         self._sc = sc
 
     def set_config(self, algorithm_config):
-        max_concurrent_jobs = algorithm_config.getint("spark", 
"maxconcurrentjobs") if algorithm_config.has_section(
-            "spark") and algorithm_config.has_option("spark", 
"maxconcurrentjobs") else 10
-        self.spark_job_stack = list(["Job %s" % x for x in xrange(1, 
max_concurrent_jobs + 1)])
         self.algorithm_config = algorithm_config
 
     def _setQueryParams(self, ds, bounds, start_time=None, end_time=None,
@@ -336,7 +275,7 @@ def _setQueryParams(self, ds, bounds, start_time=None, 
end_time=None,
 
     def _set_info_from_tile_set(self, nexus_tiles):
         ntiles = len(nexus_tiles)
-        self.log.debug('Attempting to extract info from {0} tiles'.\
+        self.log.debug('Attempting to extract info from {0} tiles'. \
                        format(ntiles))
         status = False
         self._latRes = None
@@ -354,7 +293,7 @@ def _set_info_from_tile_set(self, nexus_tiles):
                 if (len(lons) > 1):
                     self._lonRes = abs(lons[1] - lons[0])
             if ((self._latRes is not None) and
-                (self._lonRes is not None)):
+                    (self._lonRes is not None)):
                 lats_agg = np.concatenate([tile.latitudes.compressed()
                                            for tile in nexus_tiles])
                 lons_agg = np.concatenate([tile.longitudes.compressed()
@@ -395,7 +334,8 @@ def _find_global_tile_set(self):
         # Check one time stamp at a time and attempt to extract the global
         # tile set.
         for t in t_in_range:
-            nexus_tiles = 
self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, 
self._minLon, self._maxLon, ds=ds, start_time=t, end_time=t)
+            nexus_tiles = 
self._tile_service.get_tiles_bounded_by_box(self._minLat, self._maxLat, 
self._minLon,
+                                                                      
self._maxLon, ds=ds, start_time=t, end_time=t)
             if self._set_info_from_tile_set(nexus_tiles):
                 # Successfully retrieved global tile set from nexus_tiles,
                 # so no need to check any other time stamps.
diff --git a/analysis/webservice/algorithms_spark/__init__.py 
b/analysis/webservice/algorithms_spark/__init__.py
index 656c949..10776a1 100644
--- a/analysis/webservice/algorithms_spark/__init__.py
+++ b/analysis/webservice/algorithms_spark/__init__.py
@@ -32,38 +32,38 @@ def module_exists(module_name):
 if module_exists("pyspark"):
     try:
         import CorrMapSpark
-    except ImportError:
-        pass
+    except ImportError as e:
+        log.warning("Error importing CorrMapSpark", exc_info=e)
 
     try:
         import Matchup
-    except ImportError:
-        pass
+    except ImportError as e:
+        log.warning("Error importing Matchup", exc_info=e)
 
     try:
         import TimeAvgMapSpark
-    except ImportError:
-        pass
+    except ImportError as e:
+        log.warning("Error importing TimeAvgMapSpark", exc_info=e)
 
     try:
         import TimeSeriesSpark
-    except ImportError:
-        pass
+    except ImportError as e:
+        log.warning("Error importing TimeSeriesSpark", exc_info=e)
 
     try:
         import ClimMapSpark
-    except ImportError:
-        pass
+    except ImportError as e:
+        log.warning("Error importing ClimMapSpark", exc_info=e)
 
     try:
         import DailyDifferenceAverageSpark
-    except ImportError:
-        pass
+    except ImportError as e:
+        log.warning("Error importing DailyDifferenceAverageSpark", exc_info=e)
 
     try:
         import HofMoellerSpark
-    except ImportError:
-        pass
+    except ImportError as e:
+        log.warning("Error importing HofMoellerSpark", exc_info=e)
 
 
 else:
diff --git a/analysis/webservice/config/algorithms.ini 
b/analysis/webservice/config/algorithms.ini
index fae7ae5..cb1064f 100644
--- a/analysis/webservice/config/algorithms.ini
+++ b/analysis/webservice/config/algorithms.ini
@@ -1,5 +1,2 @@
 [multiprocessing]
 maxprocesses=8
-
-[spark]
-maxconcurrentjobs=10
\ No newline at end of file
diff --git a/analysis/webservice/config/web.ini 
b/analysis/webservice/config/web.ini
index a1ecb2c..3a6b689 100644
--- a/analysis/webservice/config/web.ini
+++ b/analysis/webservice/config/web.ini
@@ -1,7 +1,7 @@
 [global]
 server.socket_port=8083
-server.socket_host = '127.0.0.1'
-server.max_simultaneous_requests = 10
+server.socket_host=127.0.0.1
+server.num_sub_processes=1
 
 [livy]
 livy_port = 8998
@@ -14,4 +14,4 @@ static_enabled=true
 static_dir=static
 
 [modules]
-module_dirs=webservice.algorithms,webservice.algorithms_spark
\ No newline at end of file
+module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms
\ No newline at end of file
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index ee7e0a8..d0a2c6f 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -18,13 +18,17 @@
 import importlib
 import json
 import logging
+import signal
 import sys
 import traceback
-from multiprocessing.pool import ThreadPool
+from functools import partial
 
 import matplotlib
 import pkg_resources
+import tornado.process
 import tornado.web
+from tornado.httpserver import HTTPServer
+from tornado.ioloop import IOLoop
 from tornado.options import define, options, parse_command_line
 
 from webservice import NexusHandler
@@ -45,15 +49,12 @@ class ContentTypes(object):
 class BaseHandler(tornado.web.RequestHandler):
     path = r"/"
 
-    def initialize(self, thread_pool):
+    def initialize(self):
         self.logger = logging.getLogger('nexus')
-        self.request_thread_pool = thread_pool
 
     @tornado.web.asynchronous
     def get(self):
-
-        self.logger.info("Received request %s" % self._request_summary())
-        self.request_thread_pool.apply_async(self.run)
+        self.run()
 
     def run(self):
         self.set_header("Access-Control-Allow-Origin", "*")
@@ -86,24 +87,17 @@ def async_callback(self, result):
     ''' Override me for standard handlers! '''
 
     def do_get(self, reqObject):
-
-        for root, dirs, files in os.walk("."):
-            for pyfile in [afile for afile in files if afile.endswith(".py")]:
-                print(os.path.join(root, pyfile))
-                with open(os.path.join(root, pyfile), 'r') as original: data = 
original.read()
-                with open(os.path.join(root, pyfile), 'w') as modified: 
modified.write(license + "\n" + data)
         pass
 
 
 class ModularNexusHandlerWrapper(BaseHandler):
-    def initialize(self, thread_pool, clazz=None, algorithm_config=None, 
sc=None):
-        BaseHandler.initialize(self, thread_pool)
+    def initialize(self, clazz=None, algorithm_config=None):
+        BaseHandler.initialize(self)
         self.__algorithm_config = algorithm_config
         self.__clazz = clazz
-        self.__sc = sc
 
     def do_get(self, request):
-        instance = 
self.__clazz.instance(algorithm_config=self.__algorithm_config, sc=self.__sc)
+        instance = 
self.__clazz.instance(algorithm_config=self.__algorithm_config)
 
         results = instance.calc(request)
 
@@ -159,9 +153,63 @@ def async_callback(self, result):
             result.cleanup()
 
 
+def init_spark():
+    from pyspark import SparkContext, SparkConf
+
+    # Configure Spark
+    start_port = 4040
+    __sp_conf = SparkConf()
+    __sp_conf.set("spark.executor.memory", "6g")
+    if tornado.process.task_id():
+        logging.info("Configuring spark context for task 
{}".format(tornado.process.task_id()))
+        
__sp_conf.setAppName("nexus-analysis-{}".format(tornado.process.task_id()))
+        __sp_conf.set("spark.ui.port", "{}".format(start_port + 
tornado.process.task_id()))
+    else:
+        logging.info("Configuring spark context")
+        __sp_conf.setAppName("nexus-analysis")
+
+    sc = SparkContext(conf=__sp_conf)
+    sc.setLogLevel("INFO")
+
+    for wrapper in NexusHandler.AVAILABLE_HANDLERS:
+        if issubclass(wrapper.clazz(), NexusHandler.SparkHandler):
+            try:
+                wrapper.instance().set_spark_context(sc)
+            except AttributeError as e:
+                raise e
+    return sc
+
+
+def shutdown(sig, frame, sserver=None, sc=None):
+    """
+    Graceful shutdown of all processes.
+
+    :param sserver:
+    :param sc:
+    :param frame:
+    :param sig:
+    :return:
+    """
+    # Stop spark context
+    if sc:
+        sc.stop()
+
+    # Parent process
+    if tornado.process.task_id() is None:
+        logging.info("Received Signal %s Stopping server...", sig)
+        # Stop accepting new connections
+        sserver.stop()
+        # Stop the IO Loop
+        IOLoop.current().stop()
+    # Child Process(es)
+    else:
+        # Exit normally
+        sys.exit(0)
+
+
 if __name__ == "__main__":
     logging.basicConfig(
-        level=logging.DEBUG,
+        level=logging.INFO,
         format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
         datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
 
@@ -176,6 +224,8 @@ def async_callback(self, result):
     define("debug", default=False, help="run in debug mode")
     define("port", default=webconfig.get("global", "server.socket_port"), 
help="run on the given port", type=int)
     define("address", default=webconfig.get("global", "server.socket_host"), 
help="Bind to the given address")
+    define("subprocesses", default=webconfig.get("global", 
"server.num_sub_processes"),
+           help="Number of http server subprocesses", type=int)
     parse_command_line()
 
     moduleDirs = webconfig.get("modules", "module_dirs").split(",")
@@ -199,34 +249,17 @@ def async_callback(self, result):
     log.info("Running Nexus Initializers")
     NexusHandler.executeInitializers(algorithm_config)
 
-    max_request_threads = webconfig.getint("global", 
"server.max_simultaneous_requests")
-    log.info("Initializing request ThreadPool to %s" % max_request_threads)
-    request_thread_pool = ThreadPool(processes=max_request_threads)
-
-    spark_context = None
+    spark_handlers_loaded = False
     for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS:
+        handlers.append(
+            (clazzWrapper.path(), ModularNexusHandlerWrapper,
+             dict(clazz=clazzWrapper, algorithm_config=algorithm_config)))
         if issubclass(clazzWrapper.clazz(), NexusHandler.SparkHandler):
-            if spark_context is None:
-                from pyspark import SparkContext, SparkConf
-
-                # Configure Spark
-                sp_conf = SparkConf()
-                sp_conf.setAppName("nexus-analysis")
-                sp_conf.set("spark.scheduler.mode", "FAIR")
-                sp_conf.set("spark.executor.memory", "6g")
-                spark_context = SparkContext(conf=sp_conf)
-
-            handlers.append(
-                (clazzWrapper.path(), ModularNexusHandlerWrapper,
-                 dict(clazz=clazzWrapper, algorithm_config=algorithm_config, 
sc=spark_context,
-                      thread_pool=request_thread_pool)))
-        else:
-            handlers.append(
-                (clazzWrapper.path(), ModularNexusHandlerWrapper,
-                 dict(clazz=clazzWrapper, algorithm_config=algorithm_config, 
thread_pool=request_thread_pool)))
+            spark_handlers_loaded = True
 
 
     class VersionHandler(tornado.web.RequestHandler):
+
         def get(self):
             self.write(pkg_resources.get_distribution("nexusanalysis").version)
 
@@ -239,10 +272,22 @@ def get(self):
 
     app = tornado.web.Application(
         handlers,
-        default_host=options.address,
         debug=options.debug
     )
-    app.listen(options.port)
-
     log.info("Starting HTTP listener...")
-    tornado.ioloop.IOLoop.current().start()
+
+    server = HTTPServer(app)
+    server.bind(options.port, address=options.address)
+
+    signal.signal(signal.SIGTERM, partial(shutdown, sserver=server, sc=None))
+    signal.signal(signal.SIGINT, partial(shutdown, sserver=server, sc=None))
+
+    server.start(int(options.subprocesses))  # Forks multiple sub-processes
+
+    # Needs to be after fork
+    if spark_handlers_loaded:
+        _sc = init_spark()
+        # Spark overrides the SIGINT handler from earlier, reset it
+        signal.signal(signal.SIGINT, partial(shutdown, sserver=server, sc=_sc))
+
+    IOLoop.current().start()
diff --git a/docker/nexus-webapp/mesos/agent/docker-entrypoint.sh 
b/docker/nexus-webapp/mesos/agent/docker-entrypoint.sh
index 2228b0d..3ce4313 100755
--- a/docker/nexus-webapp/mesos/agent/docker-entrypoint.sh
+++ b/docker/nexus-webapp/mesos/agent/docker-entrypoint.sh
@@ -18,10 +18,13 @@
 set -ebx
 
 if [ -n "$TORNADO_HOST" ]; then
-  sed -i "s/server.socket_host = .*/server.socket_host = '${TORNADO_HOST}'/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
+  sed -i "s/server.socket_host.*/server.socket_host=${TORNADO_HOST}/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
+fi
+if [ -n "$MAX_PROCESSES" ]; then
+  sed -i 
"s/server.num_sub_processes.*/server.num_sub_processes=${MAX_PROCESSES}/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
 fi
 sed -i "s/host=127.0.0.1/host=$CASSANDRA_CONTACT_POINTS/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
-sed -i "s/local_datacenter=.*/local_datacenter=$CASSANDRA_LOCAL_DATACENTER/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
+sed -i "s/local_datacenter.*/local_datacenter=$CASSANDRA_LOCAL_DATACENTER/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
 sed -i "s/host=localhost:8983/host=$SOLR_URL_PORT/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini
 
 # DOMS
diff --git a/docker/nexus-webapp/mesos/webapp/docker-entrypoint.sh 
b/docker/nexus-webapp/mesos/webapp/docker-entrypoint.sh
index 0589fb2..e4d1abb 100755
--- a/docker/nexus-webapp/mesos/webapp/docker-entrypoint.sh
+++ b/docker/nexus-webapp/mesos/webapp/docker-entrypoint.sh
@@ -15,13 +15,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-set -e
+set -ebx
 
 if [ -n "$TORNADO_HOST" ]; then
-  sed -i "s/server.socket_host = .*/server.socket_host = '${TORNADO_HOST}'/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
+  sed -i "s/server.socket_host.*/server.socket_host=${TORNADO_HOST}/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
+fi
+if [ -n "$MAX_PROCESSES" ]; then
+  sed -i 
"s/server.num_sub_processes.*/server.num_sub_processes=${MAX_PROCESSES}/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
 fi
 sed -i "s/host=127.0.0.1/host=$CASSANDRA_CONTACT_POINTS/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
-sed -i "s/local_datacenter=.*/local_datacenter=$CASSANDRA_LOCAL_DATACENTER/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
+sed -i "s/local_datacenter.*/local_datacenter=$CASSANDRA_LOCAL_DATACENTER/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
 sed -i "s/host=localhost:8983/host=$SOLR_URL_PORT/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini
 
 # DOMS
diff --git a/docker/nexus-webapp/standalone/docker-entrypoint.sh 
b/docker/nexus-webapp/standalone/docker-entrypoint.sh
index 0589fb2..e4d1abb 100755
--- a/docker/nexus-webapp/standalone/docker-entrypoint.sh
+++ b/docker/nexus-webapp/standalone/docker-entrypoint.sh
@@ -15,13 +15,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-set -e
+set -ebx
 
 if [ -n "$TORNADO_HOST" ]; then
-  sed -i "s/server.socket_host = .*/server.socket_host = '${TORNADO_HOST}'/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
+  sed -i "s/server.socket_host.*/server.socket_host=${TORNADO_HOST}/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
+fi
+if [ -n "$MAX_PROCESSES" ]; then
+  sed -i 
"s/server.num_sub_processes.*/server.num_sub_processes=${MAX_PROCESSES}/g" 
${NEXUS_SRC}/analysis/webservice/config/web.ini
 fi
 sed -i "s/host=127.0.0.1/host=$CASSANDRA_CONTACT_POINTS/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
-sed -i "s/local_datacenter=.*/local_datacenter=$CASSANDRA_LOCAL_DATACENTER/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
+sed -i "s/local_datacenter.*/local_datacenter=$CASSANDRA_LOCAL_DATACENTER/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini && \
 sed -i "s/host=localhost:8983/host=$SOLR_URL_PORT/g" 
${NEXUS_SRC}/data-access/nexustiles/config/datastores.ini
 
 # DOMS


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to