Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-117-Log-model-should-have-an-Task-field 90a4f3c48 -> a95e2f9fe (forced update)
removed sleep, and added some complexity to the caching mechanism Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a95e2f9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a95e2f9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a95e2f9f Branch: refs/heads/ARIA-117-Log-model-should-have-an-Task-field Commit: a95e2f9fe83fd751ba3f95717d14bf9009278387 Parents: 62f0141 Author: max-orlov <[email protected]> Authored: Sun Mar 12 16:32:08 2017 +0200 Committer: max-orlov <[email protected]> Committed: Mon Mar 13 12:32:09 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/context/operation.py | 14 ++++++++++---- aria/orchestrator/execution_plugin/common.py | 8 +++----- .../orchestrator/execution_plugin/ctx_proxy/server.py | 6 +++++- aria/orchestrator/execution_plugin/local.py | 3 +-- aria/orchestrator/execution_plugin/ssh/operations.py | 3 +-- aria/orchestrator/workflows/executor/thread.py | 4 ---- tests/orchestrator/execution_plugin/test_local.py | 3 ++- 7 files changed, 22 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index ed0791c..d2716e8 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -17,6 +17,8 @@ Workflow and operation contexts """ +import threading + import aria from aria.utils import file from .common import BaseContext @@ -44,7 +46,7 @@ class BaseOperationContext(BaseContext): **kwargs) self._task_id = task_id self._actor_id = actor_id - self._task = None + self._thread_local = threading.local() self._execution_id = execution_id self._register_logger(task_id=self.task.id) @@ -64,9 +66,13 @@ class BaseOperationContext(BaseContext): The task in the model storage :return: Task model """ - if not self._task: - self._task = self.model.task.get(self._task_id) - return self._task + # SQLAlchemy prevents from accessing an object which was created on a different thread. + # So we retrieve the object from the storage if the current thread isn't the same as the + # original thread. + + if not hasattr(self._thread_local, 'task'): + self._thread_local.task = self.model.task.get(self._task_id) + return self._thread_local.task @property def plugin_workdir(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/execution_plugin/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py index 47cb631..7915c47 100644 --- a/aria/orchestrator/execution_plugin/common.py +++ b/aria/orchestrator/execution_plugin/common.py @@ -106,8 +106,6 @@ def create_process_config(script_path, process, operation_kwargs, quote_json_env def patch_ctx(ctx): ctx._error = None task = ctx.task - task._original_abort = task.abort - task._original_retry = task.retry def _validate_legal_action(): if ctx._error is not None: @@ -133,14 +131,14 @@ def check_error(ctx, error_check_func=None, reraise=False): _error = ctx._error # this happens when a script calls task.abort/task.retry more than once if isinstance(_error, RuntimeError): - ctx.task._original_abort(str(_error)) + ctx.task.abort(str(_error)) # ScriptException is populated by the ctx proxy server when task.abort or task.retry # are called elif isinstance(_error, exceptions.ScriptException): if _error.retry: - ctx.task._original_retry(_error.message, _error.retry_interval) + ctx.task.retry(_error.message, _error.retry_interval) else: - ctx.task._original_abort(_error.message) + ctx.task.abort(_error.message) # local and ssh operations may pass an additional logic check for errors here if error_check_func: error_check_func() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 2782ae3..817d064 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -30,8 +30,9 @@ from .. import exceptions class CtxProxy(object): - def __init__(self, ctx): + def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)): self.ctx = ctx + self._ctx_patcher = ctx_patcher self.port = _get_unused_port() self.socket_url = 'http://localhost:{0}'.format(self.port) self.server = None @@ -69,6 +70,9 @@ class CtxProxy(object): server.serve_forever(poll_interval=0.1) def serve(): + # Since task is a thread_local object, we need to patch it inside the server thread. + self._ctx_patcher(self.ctx) + bottle_app = bottle.Bottle() bottle_app.post('/', callback=self._request_handler) bottle.run( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/execution_plugin/local.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py index bc2d661..121e582 100644 --- a/aria/orchestrator/execution_plugin/local.py +++ b/aria/orchestrator/execution_plugin/local.py @@ -75,8 +75,7 @@ def _execute_func(script_path, ctx, process, operation_kwargs): env = os.environ.copy() env.update(process['env']) ctx.logger.info('Executing: {0}'.format(command)) - common.patch_ctx(ctx) - with ctx_proxy.server.CtxProxy(ctx) as proxy: + with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy: env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url running_process = subprocess.Popen( command, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/execution_plugin/ssh/operations.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py b/aria/orchestrator/execution_plugin/ssh/operations.py index 7589d42..f240beb 100644 --- a/aria/orchestrator/execution_plugin/ssh/operations.py +++ b/aria/orchestrator/execution_plugin/ssh/operations.py @@ -70,14 +70,13 @@ def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output, **k paths.remote_work_dir)) # this file has to be present before using ctx fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path) - _patch_ctx(ctx) process = common.create_process_config( script_path=paths.remote_script_path, process=process, operation_kwargs=kwargs, quote_json_env_vars=True) fabric.api.put(paths.local_script_path, paths.remote_script_path) - with ctx_proxy.server.CtxProxy(ctx) as proxy: + with ctx_proxy.server.CtxProxy(ctx, _patch_ctx) as proxy: local_port = proxy.port with fabric.context_managers.cd(process.get('cwd', paths.remote_work_dir)): # pylint: disable=not-context-manager with tunnel.remote(ctx, local_port=local_port) as remote_port: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 16b22e3..6c59986 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -17,7 +17,6 @@ Thread based executor """ -import time import Queue import threading @@ -59,9 +58,6 @@ class ThreadExecutor(BaseExecutor): self._task_started(task) try: task_func = imports.load_attribute(task.implementation) - # Some of the changes (mainly the logs fail to propagate if not enough time - # is given - time.sleep(0.1) task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 5224496..a94fc83 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -30,7 +30,8 @@ from aria.orchestrator.execution_plugin import constants from aria.orchestrator.workflows.executor import process from aria.orchestrator.workflows.core import engine -from tests import mock, storage +from tests import mock +from tests import storage from tests.orchestrator.workflows.helpers import events_collector IS_WINDOWS = os.name == 'nt'
