wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/67b75425 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/67b75425 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/67b75425 Branch: refs/heads/ARIA-106-Create-sqla-logging-handler Commit: 67b754254760db0f1276bc10a293c80c1f5f8a54 Parents: b619335 Author: mxmrlv <[email protected]> Authored: Mon Feb 13 12:28:27 2017 +0200 Committer: mxmrlv <[email protected]> Committed: Mon Feb 13 20:09:21 2017 +0200 ---------------------------------------------------------------------- aria/__init__.py | 3 +- aria/logger.py | 42 ++++++++++- aria/orchestrator/context/common.py | 22 +++++- aria/orchestrator/workflows/core/engine.py | 5 ++ aria/orchestrator/workflows/executor/process.py | 1 - aria/storage/__init__.py | 6 -- aria/storage/core.py | 16 +++-- aria/storage/modeling/model.py | 4 ++ aria/storage/modeling/orchestrator_elements.py | 12 ++++ tests/mock/topology.py | 2 +- tests/orchestrator/context/test_operation.py | 74 ++++++++++++++++++-- tests/storage/__init__.py | 23 +++--- tests/storage/test_instrumentation.py | 6 +- 13 files changed, 176 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 18eaa56..43529f0 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -97,7 +97,8 @@ def application_model_storage(api, api_kwargs=None, initiator=None, initiator_kw storage.modeling.model.ServiceInstanceUpdateStep, storage.modeling.model.ServiceInstanceModification, storage.modeling.model.Plugin, - storage.modeling.model.Task + storage.modeling.model.Task, + storage.modeling.model.Log ] return storage.ModelStorage(api_cls=api, api_kwargs=api_kwargs, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 0002cb5..f19d286 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -17,8 +17,10 @@ Logging related mixins and functions """ + import logging -from logging.handlers import RotatingFileHandler +from logging import handlers as logging_handlers +from datetime import datetime _base_logger = logging.getLogger('aria') @@ -124,7 +126,7 @@ def create_file_log_handler( """ Create a logging.handlers.RotatingFileHandler """ - rotating_file = RotatingFileHandler( + rotating_file = logging_handlers.RotatingFileHandler( filename=file_path, maxBytes=max_bytes, backupCount=backup_count, @@ -135,5 +137,41 @@ def create_file_log_handler( return rotating_file +class SQLAlchemyHandler(logging.Handler): + def __init__(self, session, engine, log_cls, **kwargs): + self._session = session + self._engine = engine + self._cls = log_cls + super(SQLAlchemyHandler, self).__init__(**kwargs) + + def emit(self, record): + log = self._cls( + logger=record.name, + level=record.levelname, + msg=record.msg, + created_at=datetime.utcnow() + ) + self._session.add(log) + self._session.commit() + + +class _SQLAlchemyHandlerFactory(object): + from aria.storage.model import Log + + def __init__(self): + self._handler = None + + def __call__(self, session, engine, model_cls=Log, level=logging.DEBUG): + if self._handler is None or not self._is_eq(session, engine, model_cls): + self._handler = SQLAlchemyHandler(session, engine, model_cls, level=level) + return self._handler + + def _is_eq(self, session, engine, model_cls): + return all([self._handler._session == session, + self._handler._engine == engine, + self._handler._cls == model_cls]) + +create_sqla_log_handler = _SQLAlchemyHandlerFactory() + _default_file_formatter = logging.Formatter( '%(asctime)s [%(name)s:%(levelname)s] %(message)s <%(pathname)s:%(lineno)d>') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 37482cf..d5ef42c 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -17,13 +17,15 @@ A common context for both workflow and operation """ from uuid import uuid4 +import logging + import jinja2 from aria import logger from aria.storage import exceptions -class BaseContext(logger.LoggerMixin): +class BaseContext(object): """ Base context object for workflow and operation """ @@ -34,6 +36,7 @@ class BaseContext(logger.LoggerMixin): service_instance_id, model_storage, resource_storage, + ctx_logger=None, workdir=None, **kwargs): super(BaseContext, self).__init__(**kwargs) @@ -42,8 +45,21 @@ class BaseContext(logger.LoggerMixin): self._model = model_storage self._resource = resource_storage self._service_instance_id = service_instance_id + self._logger = self._init_logger(ctx_logger) self._workdir = workdir + def _init_logger(self, ctx_logger=None): + ctx_logger = ctx_logger or logging.getLogger('aria_ctx') + + # A handler should be registered only once. + sqla_handler = logger.create_sqla_log_handler(**self._model.all_api_kwargs) + if sqla_handler not in ctx_logger.handlers: + ctx_logger.addHandler(sqla_handler) + + ctx_logger.setLevel(logging.DEBUG) + + return ctx_logger + def __repr__(self): return ( '{name}(name={self.name}, ' @@ -51,6 +67,10 @@ class BaseContext(logger.LoggerMixin): .format(name=self.__class__.__name__, self=self)) @property + def logger(self): + return self._logger + + @property def model(self): """ Access to the model storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 55b4159..7148dd1 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -71,6 +71,11 @@ class Engine(logger.LoggerMixin): except BaseException as e: events.on_failure_workflow_signal.send(self._workflow_context, exception=e) raise + finally: + # Each context creates its own handlers an assign them to the logger. + # This enables easy serialization. In order the handlers would not overlap, we + # need to clear them each execution. + self._workflow_context.logger.handlers = [] def cancel_execution(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 560ac43..acc0828 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -324,7 +324,6 @@ def _main(): # This is required for the instrumentation work properly. # See docstring of `remove_mutable_association_listener` for further details storage_type.remove_mutable_association_listener() - with instrumentation.track_changes() as instrument: try: ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index eaadc7e..b76bdf2 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -42,12 +42,6 @@ from .core import ( ModelStorage, ResourceStorage, ) -from .modeling import ( - structure, - model, - model_base, - type -) from . import ( exceptions, api, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index 0e189e6..7d70070 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -84,6 +84,12 @@ class Storage(LoggerMixin): self.logger.debug('{name} object is ready: {0!r}'.format( self, name=self.__class__.__name__)) + @property + def all_api_kwargs(self): + kwargs = self._api_kwargs.copy() + kwargs.update(self._additional_api_kwargs) + return kwargs + def __repr__(self): return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self) @@ -121,9 +127,7 @@ class ResourceStorage(Storage): :param name: :return: """ - kwargs = self._api_kwargs.copy() - kwargs.update(self._additional_api_kwargs) - self.registered[name] = self.api(name=name, **kwargs) + self.registered[name] = self.api(name=name, **self.all_api_kwargs) self.registered[name].create() self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self)) @@ -148,9 +152,9 @@ class ModelStorage(Storage): self.logger.debug('{name} in already storage {self!r}'.format(name=model_name, self=self)) return - kwargs = self._api_kwargs.copy() - kwargs.update(self._additional_api_kwargs) - self.registered[model_name] = self.api(name=model_name, model_cls=model_cls, **kwargs) + self.registered[model_name] = self.api(name=model_name, + model_cls=model_cls, + **self.all_api_kwargs) self.registered[model_name].create() self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/modeling/model.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/model.py b/aria/storage/modeling/model.py index 62b90b3..cf7d933 100644 --- a/aria/storage/modeling/model.py +++ b/aria/storage/modeling/model.py @@ -216,4 +216,8 @@ class Plugin(aria_declarative_base, orchestrator_elements.PluginBase): class Task(aria_declarative_base, orchestrator_elements.TaskBase): pass + + +class Log(aria_declarative_base, orchestrator_elements.LogBase): + pass # endregion http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/aria/storage/modeling/orchestrator_elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py index 5f7a3f2..8efc147 100644 --- a/aria/storage/modeling/orchestrator_elements.py +++ b/aria/storage/modeling/orchestrator_elements.py @@ -466,3 +466,15 @@ class TaskBase(ModelMixin): @staticmethod def retry(message=None, retry_interval=None): raise TaskRetryException(message, retry_interval=retry_interval) + + +class LogBase(ModelMixin): + __tablename__ = 'log' + + logger = Column(String) + level = Column(String) + msg = Column(String) + created_at = Column(DateTime, index=True) + + def __repr__(self): + return "<Log: {0} - {1}>".format(self.created_at, self.msg[:50]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/mock/topology.py ---------------------------------------------------------------------- diff --git a/tests/mock/topology.py b/tests/mock/topology.py index b04fb46..d3e8b7b 100644 --- a/tests/mock/topology.py +++ b/tests/mock/topology.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from aria.storage import model +from aria.storage.modeling import model from . import models http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 3f39979..08d360c 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -38,8 +38,7 @@ global_test_holder = {} @pytest.fixture def ctx(tmpdir): context = mock.context.simple( - str(tmpdir.join('workdir')), - inmemory=True, + str(tmpdir), context_kwargs=dict(workdir=str(tmpdir.join('workdir'))) ) yield context @@ -61,7 +60,7 @@ def test_node_operation_task_execution(ctx, executor): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) interface = mock.models.get_interface( operation_name, - operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__)) + operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)) ) node.interfaces = [interface] ctx.model.node.update(node) @@ -102,7 +101,7 @@ def test_relationship_operation_task_execution(ctx, executor): interface = mock.models.get_interface( operation_name=operation_name, - operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__)), + operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)), edge='source' ) @@ -210,8 +209,73 @@ def test_plugin_workdir(ctx, executor, tmpdir): assert expected_file.read() == content +def test_operation_logging(ctx, executor): + operation_name = 'aria.interfaces.lifecycle.create' + + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + interface = mock.models.get_interface( + operation_name, + operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__)) + ) + node.interfaces = [interface] + ctx.model.node.update(node) + + wf_start = 'wf_start' + wf_end = 'wf_end' + + inputs = { + 'op_start': 'op_start', + 'op_end': 'op_end', + } + + @workflow + def basic_workflow(graph, ctx, **_): + ctx.logger.info(wf_start) + graph.add_tasks( + api.task.OperationTask.node( + name=operation_name, + instance=node, + inputs=inputs + ) + ) + ctx.logger.debug(wf_end) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + + op_start_log = ctx.model.log.list(filters=dict(msg=inputs['op_start'])) + assert len(op_start_log) == 1 + op_start_log = op_start_log[0] + assert op_start_log.level.lower() == 'info' + + op_end_log = ctx.model.log.list(filters=dict(msg=inputs['op_end'])) + assert len(op_end_log) == 1 + op_end_log = op_end_log[0] + assert op_end_log.level.lower() == 'debug' + + wf_start_log = ctx.model.log.list(filters=dict(msg=wf_start)) + assert len(wf_start_log) == 1 + wf_start_log = wf_start_log[0] + assert wf_start_log.level.lower() == 'info' + + wf_end_log = ctx.model.log.list(filters=dict(msg=wf_end)) + assert len(wf_end_log) == 1 + wf_end_log = wf_end_log[0] + assert wf_end_log.level.lower() == 'debug' + + assert (wf_start_log.created_at < + wf_end_log.created_at < + op_start_log.created_at < + op_end_log.created_at) + + +@operation +def logged_operation(ctx, **_): + ctx.logger.info(ctx.task.inputs['op_start']) + ctx.logger.debug(ctx.task.inputs['op_end']) + + @operation -def my_operation(ctx, **_): +def basic_operation(ctx, **_): global_test_holder[ctx.name] = ctx http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/storage/__init__.py ---------------------------------------------------------------------- diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py index 4278831..5323d01 100644 --- a/tests/storage/__init__.py +++ b/tests/storage/__init__.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os -import platform + from shutil import rmtree from tempfile import mkdtemp @@ -23,19 +22,19 @@ from sqlalchemy import ( Column, Text, Integer, - pool + pool, + MetaData ) -from aria.storage import ( +from aria.storage.modeling import ( model, - type as aria_type, structure, - modeling + type as aria_type ) -class MockModel(model.aria_declarative_base, structure.ModelMixin): #pylint: disable=abstract-method +class MockModel(structure.ModelMixin, model.aria_declarative_base): #pylint: disable=abstract-method __tablename__ = 'mock_model' model_dict = Column(aria_type.Dict) model_list = Column(aria_type.List) @@ -58,14 +57,8 @@ def release_sqlite_storage(storage): :param storage: :return: """ - mapis = storage.registered.values() - - if mapis: - for session in set(mapi._session for mapi in mapis): - session.rollback() - session.close() - for engine in set(mapi._engine for mapi in mapis): - model.aria_declarative_base.metadata.drop_all(engine) + storage.all_api_kwargs['session'].close() + MetaData(bind=storage.all_api_kwargs['engine']).drop_all() def init_inmemory_model_storage(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/67b75425/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index 08d5ae0..7f0eb02 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -17,13 +17,15 @@ import pytest from sqlalchemy import Column, Text, Integer, event from aria.storage import ( - structure, ModelStorage, sql_mapi, instrumentation, exceptions, +) +from aria.storage.modeling import ( + model, type as aria_type, - model + structure, ) from ..storage import release_sqlite_storage, init_inmemory_model_storage
