Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution 
aea7cebc8 -> 51816fdb6


added between thread messaging


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/51816fdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/51816fdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/51816fdb

Branch: refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution
Commit: 51816fdb6184cb9110c03ca8a133522717e46c8a
Parents: aea7ceb
Author: max-orlov <[email protected]>
Authored: Tue May 23 18:17:08 2017 +0300
Committer: max-orlov <[email protected]>
Committed: Tue May 23 18:45:13 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/context/operation.py          | 13 ++---
 .../execution_plugin/ctx_proxy/client.py        | 54 ++++++++++++--------
 .../execution_plugin/ctx_proxy/server.py        | 38 ++++++++------
 3 files changed, 58 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/51816fdb/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py 
b/aria/orchestrator/context/operation.py
index e0eb927..52d3ab6 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -29,11 +29,11 @@ class BaseOperationContext(BaseContext):
     Context object used during operation creation and execution
     """
 
-    def __init__(self, task_id, actor_id, instantiated_from_dict=False, 
**kwargs):
+    def __init__(self, task_id, actor_id, **kwargs):
         self._task_id = task_id
         self._actor_id = actor_id
         self._thread_local = threading.local()
-        self._instantiated_from_dict = instantiated_from_dict
+        self._destroy_session = kwargs.pop('destroy_session', False)
         logger_level = kwargs.pop('logger_level', None)
         super(BaseOperationContext, self).__init__(**kwargs)
         self._register_logger(task_id=self.task.id, level=logger_level)
@@ -99,16 +99,9 @@ class BaseOperationContext(BaseContext):
 
         return cls(model_storage=model_storage,
                    resource_storage=resource_storage,
-                   instantiated_from_dict=True,
+                   destroy_session=True,
                    **kwargs)
 
-    def __del__(self):
-        self.close()
-
-    def close(self):
-        if self._instantiated_from_dict:
-            self.model.log._session.remove()
-            self.model.log._engine.dispose()
 
 class NodeOperationContext(BaseOperationContext):
     """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/51816fdb/aria/orchestrator/execution_plugin/ctx_proxy/client.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py 
b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
index d965a5e..f58c727 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/client.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
@@ -34,22 +34,25 @@ class _RequestError(RuntimeError):
         self.ex_traceback = ex_traceback
 
 
-def _http_request(socket_url, request, timeout):
-    response = urllib2.urlopen(
-        url=socket_url,
-        data=json.dumps(request),
-        timeout=timeout)
+def _http_request(socket_url, request, method, timeout):
+    opener = urllib2.build_opener(urllib2.HTTPHandler)
+    request = urllib2.Request(socket_url, data=json.dumps(request))
+    request.get_method = lambda: method
+    response = opener.open(request, timeout=timeout)
+
     if response.code != 200:
         raise RuntimeError('Request failed: {0}'.format(response))
     return json.loads(response.read())
 
 
-def _client_request(socket_url, args, timeout):
+def _client_request(socket_url, args, method, timeout):
     response = _http_request(
         socket_url=socket_url,
         request={'args': args},
-        timeout=timeout)
-    payload = response['payload']
+        method=method,
+        timeout=timeout
+    )
+    payload = response.get('payload')
     response_type = response.get('type')
     if response_type == 'error':
         ex_type = payload['type']
@@ -86,20 +89,29 @@ def _process_args(json_prefix, args):
     return processed_args
 
 
-def main(args=None):
-    args = _parse_args(args)
-    response = _client_request(
-        socket_url=args.socket_url,
-        args=_process_args(args.json_arg_prefix, args.args),
-        timeout=args.timeout)
-    if args.json_output:
-        response = json.dumps(response)
-    else:
-        if not response:
-            response = ''
-        response = str(response)
-    sys.stdout.write(response)
+def _close_session(socket_url, timeout):
+    return _client_request(socket_url, [], 'DELETE', timeout)
 
 
+def main(args=None):
+    try:
+        args = _parse_args(args)
+        response = _client_request(args.socket_url,
+            args=_process_args(args.json_arg_prefix, args.args),
+            method='POST',
+            timeout=args.timeout)
+        if args.json_output:
+            response = json.dumps(response)
+        else:
+            if not response:
+                response = ''
+            response = str(response)
+        sys.stdout.write(response)
+    except BaseException:
+        try:
+            _close_session(args.socket_url, args.timeout)
+        except BaseException:
+            pass
+
 if __name__ == '__main__':
     main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/51816fdb/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py 
b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index e3496b9..7296065 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -27,6 +27,7 @@ import bottle
 from aria import modeling
 
 from .. import exceptions
+from . import client
 
 
 class CtxProxy(object):
@@ -76,6 +77,7 @@ class CtxProxy(object):
 
             bottle_app = bottle.Bottle()
             bottle_app.post('/', callback=self._request_handler)
+            bottle_app.delete('/', callback=self._teardown_handler)
             bottle.run(
                 app=bottle_app,
                 host='localhost',
@@ -88,17 +90,31 @@ class CtxProxy(object):
         return thread
 
     def close(self):
+        client._close_session(self.socket_url, 10)
         if self.server:
             self.server.shutdown()
             self.server.server_close()
 
     def _request_handler(self):
+        return self._handle(self._process)
+
+    def _teardown_handler(self):
+        return self._handle(self._close_session)
+
+    def _handle(self, handler):
         request = bottle.request.body.read()  # pylint: disable=no-member
-        response = self._process(request)
+        response = handler(request)
         return bottle.LocalResponse(
-            body=response,
+            body=json.dumps(response or {}, 
cls=modeling.utils.ModelJSONEncoder),
             status=200,
-            headers={'content-type': 'application/json'})
+            headers={'content-type': 'application/json'}
+        )
+
+    def _close_session(self, *args, **kwargs):
+        # Since this runs in a daemon thread, we need to close the session 
each time we process
+        # a request (a new session would be supplied by SQLAlchemy 
scoped_session).
+        # log mapi is chosen arbitrarily.
+        return self.ctx.model.log._session.remove()
 
     def _process(self, request):
         try:
@@ -109,11 +125,9 @@ class CtxProxy(object):
             if isinstance(payload, exceptions.ScriptException):
                 payload = dict(message=str(payload))
                 result_type = 'stop_operation'
-            result = json.dumps({
-                'type': result_type,
-                'payload': payload
-            }, cls=modeling.utils.ModelJSONEncoder)
+            result = {'type': result_type, 'payload': payload}
         except Exception as e:
+            self._close_session()
             traceback_out = StringIO.StringIO()
             traceback.print_exc(file=traceback_out)
             payload = {
@@ -121,15 +135,7 @@ class CtxProxy(object):
                 'message': str(e),
                 'traceback': traceback_out.getvalue()
             }
-            result = json.dumps({
-                'type': 'error',
-                'payload': payload
-            })
-        finally:
-            # Since this runs in a daemon thread, we need to close the session 
each time we process
-            # a request (a new session would be supplied by SQLAlchemy 
scoped_session).
-            # log mapi is chosen arbitrarily.
-            self.ctx.model.log._session.remove()
+            result = {'type': 'error', 'payload': payload}
 
         return result
 

Reply via email to