Repository: incubator-ariatosca Updated Branches: refs/heads/master c0d76adaf -> 95177d0f7
ARIA-117-Log-model-should-have-an-Task-field Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/95177d0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/95177d0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/95177d0f Branch: refs/heads/master Commit: 95177d0f7fdcedf9c32421e2557ddf965683525a Parents: c0d76ad Author: max-orlov <[email protected]> Authored: Mon Mar 6 16:43:58 2017 +0200 Committer: max-orlov <[email protected]> Committed: Mon Mar 13 13:18:08 2017 +0200 ---------------------------------------------------------------------- aria/logger.py | 1 + aria/orchestrator/context/common.py | 11 +++--- aria/orchestrator/context/operation.py | 16 ++++++--- aria/orchestrator/execution_plugin/common.py | 8 ++--- .../execution_plugin/ctx_proxy/server.py | 6 +++- aria/orchestrator/execution_plugin/local.py | 3 +- .../execution_plugin/ssh/operations.py | 3 +- aria/storage/modeling/orchestrator_elements.py | 8 +++++ tests/orchestrator/context/test_operation.py | 36 ++++++++++++++------ .../orchestrator/execution_plugin/test_local.py | 3 +- 10 files changed, 65 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 6f0b84a..42e3679 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -164,6 +164,7 @@ class _SQLAlchemyHandler(logging.Handler): '%Y-%m-%d %H:%M:%S,%f') log = self._cls( execution_fk=self._execution_id, + task_fk=record.task_id, actor=record.prefix, level=record.levelname, msg=record.msg, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 1d228b6..bb9d839 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -36,13 +36,15 @@ class BaseContext(object): """ class PrefixedLogger(object): - def __init__(self, logger, prefix=''): + def __init__(self, logger, prefix='', task_id=None): self._logger = logger self._prefix = prefix + self._task_id = task_id def __getattr__(self, item): if item.upper() in logging._levelNames: - return partial(getattr(self._logger, item), extra={'prefix': self._prefix}) + return partial(getattr(self._logger, item), + extra={'prefix': self._prefix, 'task_id': self._task_id}) else: return getattr(self._logger, item) @@ -74,9 +76,10 @@ class BaseContext(object): self.model.execution.put(execution) return execution.id - def _register_logger(self, logger_name=None, level=None): + def _register_logger(self, logger_name=None, level=None, task_id=None): self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__), - self.logging_id) + self.logging_id, + task_id=task_id) self.logger.addHandler(aria_logger.create_console_log_handler()) self.logger.addHandler(self._get_sqla_handler()) self.logger.setLevel(level or logging.DEBUG) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index cbaa462..d2716e8 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -17,6 +17,8 @@ Workflow and operation contexts """ +import threading + import aria from aria.utils import file from .common import BaseContext @@ -44,9 +46,9 @@ class BaseOperationContext(BaseContext): **kwargs) self._task_id = task_id self._actor_id = actor_id - self._task = None + self._thread_local = threading.local() self._execution_id = execution_id - self._register_logger() + self._register_logger(task_id=self.task.id) def __repr__(self): details = 'implementation={task.implementation}; ' \ @@ -64,9 +66,13 @@ class BaseOperationContext(BaseContext): The task in the model storage :return: Task model """ - if not self._task: - self._task = self.model.task.get(self._task_id) - return self._task + # SQLAlchemy prevents from accessing an object which was created on a different thread. + # So we retrieve the object from the storage if the current thread isn't the same as the + # original thread. + + if not hasattr(self._thread_local, 'task'): + self._thread_local.task = self.model.task.get(self._task_id) + return self._thread_local.task @property def plugin_workdir(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py index 47cb631..7915c47 100644 --- a/aria/orchestrator/execution_plugin/common.py +++ b/aria/orchestrator/execution_plugin/common.py @@ -106,8 +106,6 @@ def create_process_config(script_path, process, operation_kwargs, quote_json_env def patch_ctx(ctx): ctx._error = None task = ctx.task - task._original_abort = task.abort - task._original_retry = task.retry def _validate_legal_action(): if ctx._error is not None: @@ -133,14 +131,14 @@ def check_error(ctx, error_check_func=None, reraise=False): _error = ctx._error # this happens when a script calls task.abort/task.retry more than once if isinstance(_error, RuntimeError): - ctx.task._original_abort(str(_error)) + ctx.task.abort(str(_error)) # ScriptException is populated by the ctx proxy server when task.abort or task.retry # are called elif isinstance(_error, exceptions.ScriptException): if _error.retry: - ctx.task._original_retry(_error.message, _error.retry_interval) + ctx.task.retry(_error.message, _error.retry_interval) else: - ctx.task._original_abort(_error.message) + ctx.task.abort(_error.message) # local and ssh operations may pass an additional logic check for errors here if error_check_func: error_check_func() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 2782ae3..817d064 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -30,8 +30,9 @@ from .. import exceptions class CtxProxy(object): - def __init__(self, ctx): + def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)): self.ctx = ctx + self._ctx_patcher = ctx_patcher self.port = _get_unused_port() self.socket_url = 'http://localhost:{0}'.format(self.port) self.server = None @@ -69,6 +70,9 @@ class CtxProxy(object): server.serve_forever(poll_interval=0.1) def serve(): + # Since task is a thread_local object, we need to patch it inside the server thread. + self._ctx_patcher(self.ctx) + bottle_app = bottle.Bottle() bottle_app.post('/', callback=self._request_handler) bottle.run( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/local.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py index bc2d661..121e582 100644 --- a/aria/orchestrator/execution_plugin/local.py +++ b/aria/orchestrator/execution_plugin/local.py @@ -75,8 +75,7 @@ def _execute_func(script_path, ctx, process, operation_kwargs): env = os.environ.copy() env.update(process['env']) ctx.logger.info('Executing: {0}'.format(command)) - common.patch_ctx(ctx) - with ctx_proxy.server.CtxProxy(ctx) as proxy: + with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy: env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url running_process = subprocess.Popen( command, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/orchestrator/execution_plugin/ssh/operations.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py b/aria/orchestrator/execution_plugin/ssh/operations.py index 7589d42..f240beb 100644 --- a/aria/orchestrator/execution_plugin/ssh/operations.py +++ b/aria/orchestrator/execution_plugin/ssh/operations.py @@ -70,14 +70,13 @@ def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output, **k paths.remote_work_dir)) # this file has to be present before using ctx fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path) - _patch_ctx(ctx) process = common.create_process_config( script_path=paths.remote_script_path, process=process, operation_kwargs=kwargs, quote_json_env_vars=True) fabric.api.put(paths.local_script_path, paths.remote_script_path) - with ctx_proxy.server.CtxProxy(ctx) as proxy: + with ctx_proxy.server.CtxProxy(ctx, _patch_ctx) as proxy: local_port = proxy.port with fabric.context_managers.cd(process.get('cwd', paths.remote_work_dir)): # pylint: disable=not-context-manager with tunnel.remote(ctx, local_port=local_port) as remote_port: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/aria/storage/modeling/orchestrator_elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py index d06b5d0..ef773ed 100644 --- a/aria/storage/modeling/orchestrator_elements.py +++ b/aria/storage/modeling/orchestrator_elements.py @@ -479,6 +479,14 @@ class LogBase(ModelMixin): def execution(cls): return cls.many_to_one_relationship('execution') + @declared_attr + def task_fk(cls): + return cls.foreign_key('task', nullable=True) + + @declared_attr + def task(cls): + return cls.many_to_one_relationship('task') + level = Column(String) msg = Column(String) created_at = Column(DateTime, index=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 8db2bc6..b49b1cb 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -49,12 +49,21 @@ def ctx(tmpdir): @pytest.fixture +def process_executor(): + ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR)) + try: + yield ex + finally: + ex.close() + + [email protected] def thread_executor(): - result = thread.ThreadExecutor() + ex = thread.ThreadExecutor() try: - yield result + yield ex finally: - result.close() + ex.close() def test_node_operation_task_execution(ctx, thread_executor): @@ -213,16 +222,16 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): @pytest.fixture(params=[ - # (thread.ThreadExecutor, dict()), - (process.ProcessExecutor, dict(python_path=tests.ROOT_DIR)) + (thread.ThreadExecutor, {}), + (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request): - ex_cls, kwargs = request.param - ex = ex_cls(**kwargs) + executor_cls, executor_kwargs = request.param + result = executor_cls(**executor_kwargs) try: - yield ex + yield result finally: - ex.close() + result.close() def test_node_operation_logging(ctx, executor): @@ -295,10 +304,17 @@ def _assert_loggins(ctx, inputs): assert len(executions) == 1 execution = executions[0] + tasks = ctx.model.task.list() + assert len(tasks) == 1 + task = tasks[0] + assert task.logs.count() == 4 + logs = ctx.model.log.list() assert len(logs) == execution.logs.count() == 6 - assert all(l in logs for l in execution.logs) + assert set(logs) == set(execution.logs) + assert all(l.execution == execution for l in logs) + assert all(l in logs and l.task == task for l in task.logs) op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info'] assert len(op_start_log) == 1 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95177d0f/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 5224496..a94fc83 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -30,7 +30,8 @@ from aria.orchestrator.execution_plugin import constants from aria.orchestrator.workflows.executor import process from aria.orchestrator.workflows.core import engine -from tests import mock, storage +from tests import mock +from tests import storage from tests.orchestrator.workflows.helpers import events_collector IS_WINDOWS = os.name == 'nt'
