Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-106-Create-sqla-logging-handler 248f1b11b -> 6e0f13ae2
added safe commit mechanism and finetuned test Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/6e0f13ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/6e0f13ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/6e0f13ae Branch: refs/heads/ARIA-106-Create-sqla-logging-handler Commit: 6e0f13ae251f0c3300170b1c676c9056ad45fe34 Parents: 248f1b1 Author: mxmrlv <[email protected]> Authored: Wed Feb 22 12:11:16 2017 +0200 Committer: mxmrlv <[email protected]> Committed: Wed Feb 22 12:11:16 2017 +0200 ---------------------------------------------------------------------- aria/__init__.py | 8 +++- aria/orchestrator/workflows/core/engine.py | 14 +++---- aria/utils/imports.py | 22 +++++++++++ tests/orchestrator/context/test_operation.py | 47 ++++++++++++----------- 4 files changed, 59 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6e0f13ae/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 43529f0..bf67009 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -17,7 +17,11 @@ Aria top level package """ -import pkgutil +import sys +if sys.version_info < (2, 7): + from utils.imports import iter_modules +else: + from pkgutil import iter_modules try: import pkg_resources @@ -48,7 +52,7 @@ def install_aria_extensions(): :code:`aria_extension` entry points and loads them. It then invokes all registered extension functions. """ - for loader, module_name, _ in pkgutil.iter_modules(): + for loader, module_name, _ in iter_modules(): if module_name.startswith('aria_extension_'): loader.find_module(module_name).load_module(module_name) if pkg_resources: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6e0f13ae/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index b26a69d..c6ac2b3 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -51,7 +51,7 @@ class Engine(logger.LoggerMixin): execute the workflow """ try: - self._signal(events.start_workflow_signal) + events.start_workflow_signal.send(self._workflow_context) while True: cancel = self._is_cancel() if cancel: @@ -65,11 +65,12 @@ class Engine(logger.LoggerMixin): else: time.sleep(0.1) if cancel: - self._signal(events.on_cancelled_workflow_signal) + events.on_cancelled_workflow_signal.send(self._workflow_context) else: - self._signal(events.on_success_workflow_signal) + events.on_success_workflow_signal.send(self._workflow_context) except BaseException as e: - self._signal(events.on_failure_workflow_signal, exception=e) + + events.on_failure_workflow_signal.send(self._workflow_context, exception=e) raise def cancel_execution(self): @@ -78,10 +79,7 @@ class Engine(logger.LoggerMixin): will be modified to 'cancelling' status. If execution is in pending mode, execution status will be modified to 'cancelled' directly. """ - self._signal(events.on_cancelling_workflow_signal) - - def _signal(self, signal, **kwargs): - signal.send(self._workflow_context, **kwargs) + events.on_cancelling_workflow_signal.send(self._workflow_context) def _is_cancel(self): return self._workflow_context.execution.status in [model.Execution.CANCELLING, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6e0f13ae/aria/utils/imports.py ---------------------------------------------------------------------- diff --git a/aria/utils/imports.py b/aria/utils/imports.py index e9c164e..6193ce1 100644 --- a/aria/utils/imports.py +++ b/aria/utils/imports.py @@ -18,6 +18,7 @@ Utility methods for dynamically loading python code """ import importlib +import pkgutil def import_fullname(name, paths=None): @@ -76,3 +77,24 @@ def load_attribute(attribute_path): except AttributeError: # TODO: handle raise + + +class _SafeModuleImporter(object): + def __init__(self): + self._yielded = {} + + def iter_modules(self): + # apparently pkgutil had some issues in python 2.6. Accessing any root level directories failed. + # and it got the entire process of importing fail. Since we only need any aria_extension related + # loading, in the meantime we could try to import only those (and assume they are not located at + # the root level. [In python 2.7 it does actually ignore any OSError. + for importer in pkgutil.iter_importers(): + try: + for module_name, ispkg in pkgutil.iter_importer_modules(importer): + if module_name not in self._yielded: + self._yielded[module_name] = True + yield importer, module_name, ispkg + except OSError: + pass + +iter_modules = _SafeModuleImporter().iter_modules http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6e0f13ae/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 3001f56..331eaa4 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -19,6 +19,7 @@ import logging import tempfile import pytest +import time from aria.orchestrator.workflows.executor import process, thread @@ -71,18 +72,6 @@ def thread_executor(): result.close() [email protected](params=[ - (thread.ThreadExecutor()), - (process.ProcessExecutor(python_path=tests.ROOT_DIR)) -]) -def executor(request): - ex = request.param - try: - yield ex - finally: - ex.close() - - def test_node_operation_task_execution(ctx, thread_executor): operation_name = 'aria.interfaces.lifecycle.create' @@ -238,6 +227,18 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): assert expected_file.read() == content [email protected](params=[ + (thread.ThreadExecutor()), + (process.ProcessExecutor(python_path=tests.ROOT_DIR)) +]) +def executor(request): + ex = request.param + try: + yield ex + finally: + ex.close() + + def test_operation_logging(ctx, executor): operation_name = 'aria.interfaces.lifecycle.create' @@ -280,22 +281,24 @@ def test_operation_logging(ctx, executor): assert op_start_log.created_at < op_end_log.created_at - with open(tmp_file, 'r') as f: - logs = [l.strip() for l in f.readlines()] - - assert inputs['op_start'] in logs - assert inputs['op_end'] in logs + # with open(tmp_file, 'r') as f: + # logs = [l.strip() for l in f.readlines()] + # + # assert inputs['op_start'] in logs + # assert inputs['op_end'] in logs -class MockLogHandler(logging.Handler): - def emit(self, record): - with open(tmp_file, 'a+') as f: - f.write(record.msg + '\n') +# class MockLogHandler(logging.Handler): +# def emit(self, record): +# with open(tmp_file, 'a+') as f: +# f.write(record.msg + '\n') -@operation(logging_handlers=[MockLogHandler()]) +@operation #(logging_handlers=[MockLogHandler()]) def logged_operation(ctx, **_): ctx.logger.info(ctx.task.inputs['op_start']) + # enables to check the relation between the created_at field properly + time.sleep(1) ctx.logger.debug(ctx.task.inputs['op_end'])
