Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-117-Log-model-should-have-an-Task-field 73d5bce98 -> 62f01414a (forced update)
ARIA-117-Log-model-should-have-an-Task-field Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/62f01414 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/62f01414 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/62f01414 Branch: refs/heads/ARIA-117-Log-model-should-have-an-Task-field Commit: 62f01414a08efaa2e498f646a2213ae23a331c68 Parents: c0d76ad Author: max-orlov <[email protected]> Authored: Mon Mar 6 16:43:58 2017 +0200 Committer: max-orlov <[email protected]> Committed: Sun Mar 12 13:17:46 2017 +0200 ---------------------------------------------------------------------- aria/logger.py | 1 + aria/orchestrator/context/common.py | 11 ++++--- aria/orchestrator/context/operation.py | 2 +- aria/orchestrator/workflows/executor/thread.py | 4 +++ aria/storage/modeling/orchestrator_elements.py | 8 +++++ tests/orchestrator/context/test_operation.py | 36 +++++++++++++++------ 6 files changed, 47 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62f01414/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 6f0b84a..42e3679 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -164,6 +164,7 @@ class _SQLAlchemyHandler(logging.Handler): '%Y-%m-%d %H:%M:%S,%f') log = self._cls( execution_fk=self._execution_id, + task_fk=record.task_id, actor=record.prefix, level=record.levelname, msg=record.msg, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62f01414/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 1d228b6..bb9d839 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -36,13 +36,15 @@ class BaseContext(object): """ class PrefixedLogger(object): - def __init__(self, logger, prefix=''): + def __init__(self, logger, prefix='', task_id=None): self._logger = logger self._prefix = prefix + self._task_id = task_id def __getattr__(self, item): if item.upper() in logging._levelNames: - return partial(getattr(self._logger, item), extra={'prefix': self._prefix}) + return partial(getattr(self._logger, item), + extra={'prefix': self._prefix, 'task_id': self._task_id}) else: return getattr(self._logger, item) @@ -74,9 +76,10 @@ class BaseContext(object): self.model.execution.put(execution) return execution.id - def _register_logger(self, logger_name=None, level=None): + def _register_logger(self, logger_name=None, level=None, task_id=None): self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__), - self.logging_id) + self.logging_id, + task_id=task_id) self.logger.addHandler(aria_logger.create_console_log_handler()) self.logger.addHandler(self._get_sqla_handler()) self.logger.setLevel(level or logging.DEBUG) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62f01414/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index cbaa462..ed0791c 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -46,7 +46,7 @@ class BaseOperationContext(BaseContext): self._actor_id = actor_id self._task = None self._execution_id = execution_id - self._register_logger() + self._register_logger(task_id=self.task.id) def __repr__(self): details = 'implementation={task.implementation}; ' \ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62f01414/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 6c59986..16b22e3 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -17,6 +17,7 @@ Thread based executor """ +import time import Queue import threading @@ -58,6 +59,9 @@ class ThreadExecutor(BaseExecutor): self._task_started(task) try: task_func = imports.load_attribute(task.implementation) + # Some of the changes (mainly the logs fail to propagate if not enough time + # is given + time.sleep(0.1) task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62f01414/aria/storage/modeling/orchestrator_elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py index d06b5d0..ef773ed 100644 --- a/aria/storage/modeling/orchestrator_elements.py +++ b/aria/storage/modeling/orchestrator_elements.py @@ -479,6 +479,14 @@ class LogBase(ModelMixin): def execution(cls): return cls.many_to_one_relationship('execution') + @declared_attr + def task_fk(cls): + return cls.foreign_key('task', nullable=True) + + @declared_attr + def task(cls): + return cls.many_to_one_relationship('task') + level = Column(String) msg = Column(String) created_at = Column(DateTime, index=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62f01414/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 8db2bc6..b49b1cb 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -49,12 +49,21 @@ def ctx(tmpdir): @pytest.fixture +def process_executor(): + ex = process.ProcessExecutor(**dict(python_path=tests.ROOT_DIR)) + try: + yield ex + finally: + ex.close() + + [email protected] def thread_executor(): - result = thread.ThreadExecutor() + ex = thread.ThreadExecutor() try: - yield result + yield ex finally: - result.close() + ex.close() def test_node_operation_task_execution(ctx, thread_executor): @@ -213,16 +222,16 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): @pytest.fixture(params=[ - # (thread.ThreadExecutor, dict()), - (process.ProcessExecutor, dict(python_path=tests.ROOT_DIR)) + (thread.ThreadExecutor, {}), + (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request): - ex_cls, kwargs = request.param - ex = ex_cls(**kwargs) + executor_cls, executor_kwargs = request.param + result = executor_cls(**executor_kwargs) try: - yield ex + yield result finally: - ex.close() + result.close() def test_node_operation_logging(ctx, executor): @@ -295,10 +304,17 @@ def _assert_loggins(ctx, inputs): assert len(executions) == 1 execution = executions[0] + tasks = ctx.model.task.list() + assert len(tasks) == 1 + task = tasks[0] + assert task.logs.count() == 4 + logs = ctx.model.log.list() assert len(logs) == execution.logs.count() == 6 - assert all(l in logs for l in execution.logs) + assert set(logs) == set(execution.logs) + assert all(l.execution == execution for l in logs) + assert all(l in logs and l.task == task for l in task.logs) op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info'] assert len(op_start_log) == 1
