Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-117-Log-model-should-have-an-Task-field 174f16edf -> 100957214 (forced update)
removed sleep, and added some complexity to the caching mechanism Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/10095721 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/10095721 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/10095721 Branch: refs/heads/ARIA-117-Log-model-should-have-an-Task-field Commit: 10095721462b49c3d7d75bf2d1400660f455b1da Parents: 62f0141 Author: max-orlov <[email protected]> Authored: Sun Mar 12 16:32:08 2017 +0200 Committer: max-orlov <[email protected]> Committed: Sun Mar 12 18:45:14 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/context/operation.py | 14 ++++++++++---- aria/orchestrator/execution_plugin/common.py | 6 +++--- .../orchestrator/execution_plugin/ctx_proxy/server.py | 8 +++++++- aria/orchestrator/execution_plugin/local.py | 1 - aria/orchestrator/workflows/executor/thread.py | 4 ---- tests/orchestrator/execution_plugin/test_local.py | 5 ++++- 6 files changed, 24 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/10095721/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index ed0791c..d2716e8 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -17,6 +17,8 @@ Workflow and operation contexts """ +import threading + import aria from aria.utils import file from .common import BaseContext @@ -44,7 +46,7 @@ class BaseOperationContext(BaseContext): **kwargs) self._task_id = task_id self._actor_id = actor_id - self._task = None + self._thread_local = threading.local() self._execution_id = execution_id self._register_logger(task_id=self.task.id) @@ -64,9 +66,13 @@ class BaseOperationContext(BaseContext): The task in the model storage :return: Task model """ - if not self._task: - self._task = self.model.task.get(self._task_id) - return self._task + # SQLAlchemy prevents from accessing an object which was created on a different thread. + # So we retrieve the object from the storage if the current thread isn't the same as the + # original thread. + + if not hasattr(self._thread_local, 'task'): + self._thread_local.task = self.model.task.get(self._task_id) + return self._thread_local.task @property def plugin_workdir(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/10095721/aria/orchestrator/execution_plugin/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py index 47cb631..7d33f2c 100644 --- a/aria/orchestrator/execution_plugin/common.py +++ b/aria/orchestrator/execution_plugin/common.py @@ -133,14 +133,14 @@ def check_error(ctx, error_check_func=None, reraise=False): _error = ctx._error # this happens when a script calls task.abort/task.retry more than once if isinstance(_error, RuntimeError): - ctx.task._original_abort(str(_error)) + ctx.task.abort(str(_error)) # ScriptException is populated by the ctx proxy server when task.abort or task.retry # are called elif isinstance(_error, exceptions.ScriptException): if _error.retry: - ctx.task._original_retry(_error.message, _error.retry_interval) + ctx.task.retry(_error.message, _error.retry_interval) else: - ctx.task._original_abort(_error.message) + ctx.task.abort(_error.message) # local and ssh operations may pass an additional logic check for errors here if error_check_func: error_check_func() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/10095721/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 2782ae3..5423cbd 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -25,7 +25,10 @@ import wsgiref.simple_server import bottle -from .. import exceptions +from .. import ( + exceptions, + common +) class CtxProxy(object): @@ -69,6 +72,9 @@ class CtxProxy(object): server.serve_forever(poll_interval=0.1) def serve(): + # Since task is a thread_local object, we need to patch it inside the server thread. + common.patch_ctx(self.ctx) + bottle_app = bottle.Bottle() bottle_app.post('/', callback=self._request_handler) bottle.run( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/10095721/aria/orchestrator/execution_plugin/local.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py index bc2d661..2101d21 100644 --- a/aria/orchestrator/execution_plugin/local.py +++ b/aria/orchestrator/execution_plugin/local.py @@ -75,7 +75,6 @@ def _execute_func(script_path, ctx, process, operation_kwargs): env = os.environ.copy() env.update(process['env']) ctx.logger.info('Executing: {0}'.format(command)) - common.patch_ctx(ctx) with ctx_proxy.server.CtxProxy(ctx) as proxy: env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url running_process = subprocess.Popen( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/10095721/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 16b22e3..6c59986 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -17,7 +17,6 @@ Thread based executor """ -import time import Queue import threading @@ -59,9 +58,6 @@ class ThreadExecutor(BaseExecutor): self._task_started(task) try: task_func = imports.load_attribute(task.implementation) - # Some of the changes (mainly the logs fail to propagate if not enough time - # is given - time.sleep(0.1) task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/10095721/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 5224496..da612ef 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -18,7 +18,10 @@ import os import pytest -from aria import workflow +from aria import ( + workflow, + storage +) from aria.orchestrator import events from aria.orchestrator.workflows import api from aria.orchestrator.workflows.exceptions import ExecutorException
