Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-117-Log-model-should-have-an-Task-field 84d35fa33 -> 174f16edf (forced update)
removed sleep, and added some complexity to the caching mechanism Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/174f16ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/174f16ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/174f16ed Branch: refs/heads/ARIA-117-Log-model-should-have-an-Task-field Commit: 174f16edf72bb2a8dd7498c9ed16f392dc12d3cb Parents: 62f0141 Author: max-orlov <[email protected]> Authored: Sun Mar 12 16:32:08 2017 +0200 Committer: max-orlov <[email protected]> Committed: Sun Mar 12 16:36:52 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/context/operation.py | 13 ++++++++++++- aria/orchestrator/workflows/executor/thread.py | 4 ---- 2 files changed, 12 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/174f16ed/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index ed0791c..212c45a 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -17,6 +17,8 @@ Workflow and operation contexts """ +import threading + import aria from aria.utils import file from .common import BaseContext @@ -47,6 +49,7 @@ class BaseOperationContext(BaseContext): self._task = None self._execution_id = execution_id self._register_logger(task_id=self.task.id) + self._current_thread = self._current_thread_id def __repr__(self): details = 'implementation={task.implementation}; ' \ @@ -64,11 +67,19 @@ class BaseOperationContext(BaseContext): The task in the model storage :return: Task model """ - if not self._task: + # SQLAlchemy prevents from accessing an object which was created on a different thread. + # So we retrieve the object from the storage if the current thread isn't the same as the + # original thread. + if not self._task or self._current_thread_id != self._current_thread: self._task = self.model.task.get(self._task_id) + self._current_thread = self._current_thread_id return self._task @property + def _current_thread_id(self): + return threading.current_thread().ident + + @property def plugin_workdir(self): """ A work directory that is unique to the plugin and the deployment id http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/174f16ed/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 16b22e3..6c59986 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -17,7 +17,6 @@ Thread based executor """ -import time import Queue import threading @@ -59,9 +58,6 @@ class ThreadExecutor(BaseExecutor): self._task_started(task) try: task_func = imports.load_attribute(task.implementation) - # Some of the changes (mainly the logs fail to propagate if not enough time - # is given - time.sleep(0.1) task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e:
