Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-138-Make-logging-more-informative f0d093b7a -> 59d5301fc 
(forced update)


ARIA-138-Make-logging-more-informative


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/59d5301f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/59d5301f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/59d5301f

Branch: refs/heads/ARIA-138-Make-logging-more-informative
Commit: 59d5301fc0598c9e27458388d02e89d7b9a3f2bc
Parents: 8e5a1ec
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Apr 19 17:14:15 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Apr 20 14:30:09 2017 +0300

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 | 18 ++++-
 aria/cli/commands/logs.py                       | 23 +++---
 aria/cli/execution_logging.py                   | 78 ++++++++++++++++++++
 aria/cli/logger.py                              | 18 +++++
 aria/logger.py                                  |  4 +-
 aria/modeling/orchestration.py                  | 11 ++-
 aria/orchestrator/context/common.py             | 47 ++++++------
 aria/orchestrator/context/operation.py          | 29 +-------
 aria/orchestrator/context/workflow.py           |  4 +-
 .../execution_plugin/instantiation.py           |  4 +-
 aria/orchestrator/workflow_runner.py            |  6 +-
 aria/orchestrator/workflows/core/engine.py      |  1 +
 aria/orchestrator/workflows/events_logging.py   | 35 ++++++---
 aria/orchestrator/workflows/executor/base.py    |  4 +-
 aria/orchestrator/workflows/executor/dry.py     | 18 +++--
 aria/orchestrator/workflows/executor/process.py | 15 ++--
 aria/orchestrator/workflows/executor/thread.py  |  8 +-
 tests/.pylintrc                                 |  2 +-
 .../orchestrator/workflows/executor/__init__.py | 51 +++++++++++++
 .../workflows/executor/test_executor.py         | 64 +++-------------
 .../workflows/executor/test_process_executor.py | 37 +---------
 21 files changed, 281 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index e100f0d..99dc206 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -18,6 +18,8 @@ import os
 from .. import helptexts
 from .. import table
 from .. import utils
+from .. import logger as cli_logger
+from .. import execution_logging
 from ..core import aria
 from ...modeling.models import Execution
 from ...orchestrator.workflow_runner import WorkflowRunner
@@ -141,12 +143,19 @@ def start(workflow_name,
 
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if 
dry else ''))
     execution_thread.start()
+
+    log_iterator = cli_logger.ModelLogIterator(model_storage, 
workflow_runner.execution_id)
     try:
         while execution_thread.is_alive():
-            # using join without a timeout blocks and ignores KeyboardInterrupt
+            execution_logging.log_list(log_iterator)
             execution_thread.join(1)
+
     except KeyboardInterrupt:
-        _cancel_execution(workflow_runner, execution_thread, logger)
+        _cancel_execution(model_storage, workflow_runner, execution_thread, 
logger)
+
+    # It might be the case where some logs were written and the execution was 
terminated, thus we
+    # need to drain the remaining logs.
+    execution_logging.log_list(log_iterator)
 
     # raise any errors from the execution thread (note these are not workflow 
execution errors)
     execution_thread.raise_error_if_exists()
@@ -161,12 +170,15 @@ def start(workflow_name,
         model_storage.execution.delete(execution)
 
 
-def _cancel_execution(workflow_runner, execution_thread, logger):
+def _cancel_execution(model_storage, workflow_runner, execution_thread, 
logger):
     logger.info('Cancelling execution. Press Ctrl+C again to force-cancel')
+    log_iterator = cli_logger.ModelLogIterator(model_storage, 
workflow_runner.execution_id)
     try:
         workflow_runner.cancel()
         while execution_thread.is_alive():
+            execution_logging.log_list(log_iterator)
             execution_thread.join(1)
     except KeyboardInterrupt:
         logger.info('Force-cancelling execution')
         # TODO handle execution (update status etc.) and exit process
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/cli/commands/logs.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/logs.py b/aria/cli/commands/logs.py
index 6c83347..d29bb9c 100644
--- a/aria/cli/commands/logs.py
+++ b/aria/cli/commands/logs.py
@@ -12,13 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from .. import utils
-from ..core import aria
+from ..logger import ModelLogIterator
+from .. core import aria
+from .. import execution_logging
 
 
 @aria.group(name='logs')
-@aria.options.verbose()
 def logs():
     """Show logs from workflow executions
     """
@@ -31,19 +30,15 @@ def logs():
 @aria.options.verbose()
 @aria.pass_model_storage
 @aria.pass_logger
-def list(execution_id,
-         model_storage,
-         logger):
+def list(execution_id, model_storage, logger):
     """Display logs for an execution
     """
     logger.info('Listing logs for execution id {0}'.format(execution_id))
-    logs_list = model_storage.log.list(filters=dict(execution_fk=execution_id),
-                                       
sort=utils.storage_sort_param('created_at', False))
-    # TODO: print logs nicely
-    if logs_list:
-        for log in logs_list:
-            logger.info(log)
-    else:
+    log_iterator = ModelLogIterator(model_storage, execution_id)
+
+    any_logs = execution_logging.log_list(log_iterator)
+
+    if not any_logs:
         logger.info('\tNo logs')
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/cli/execution_logging.py
----------------------------------------------------------------------
diff --git a/aria/cli/execution_logging.py b/aria/cli/execution_logging.py
new file mode 100644
index 0000000..8baf6d7
--- /dev/null
+++ b/aria/cli/execution_logging.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from StringIO import StringIO
+
+from . import logger
+from .env import env
+
+DEFAULT_FORMATTING = {
+    logger.NO_VERBOSE: {'message': '{item.msg}'},
+    logger.LOW_VERBOSE: {
+        'message': '{timestamp} | {item.level[0]} | {item.msg}',
+        'timestamp': '%H:%M:%S'
+    },
+    logger.MEDIUM_VERBOSE: {
+        'message': '{timestamp} | {item.level[0]} | {implementation} | 
{item.msg} ',
+        'timestamp': '%H:%M:%S'
+    },
+    logger.HIGH_VERBOSE: {
+        'message': '{timestamp} | {item.level[0]} | {implementation}({inputs}) 
| {item.msg} ',
+        'timestamp': '%H:%M:%S'
+    },
+}
+
+
+def _str(item, formats=None):
+    # If no formats are passed we revert to the default formats (per level)
+    formats = formats or {}
+    formatting = formats.get(env.logging.verbosity_level,
+                             DEFAULT_FORMATTING[env.logging.verbosity_level])
+    msg = StringIO()
+
+    formatting_kwargs = dict(item=item)
+
+    if item.task:
+        formatting_kwargs['implementation'] = item.task.implementation
+        formatting_kwargs['inputs'] = dict(i.unwrap() for i in 
item.task.inputs.values())
+    else:
+        formatting_kwargs['implementation'] = item.execution.workflow_name
+        formatting_kwargs['inputs'] = dict(i.unwrap() for i in 
item.execution.inputs.values())
+
+    if 'timestamp' in formatting:
+        formatting_kwargs['timestamp'] = 
item.created_at.strftime(formatting['timestamp'])
+    else:
+        formatting_kwargs['timestamp'] = item.created_at
+
+    msg.write(formatting['message'].format(**formatting_kwargs))
+
+    # Add the exception and the error msg.
+    if item.traceback and env.logging.verbosity_level >= logger.MEDIUM_VERBOSE:
+        for line in item.traceback.splitlines(True):
+            msg.write('\t' + '|' + line)
+
+    return msg.getvalue()
+
+
+def log(item, *args, **kwargs):
+    return getattr(env.logging.logger, item.level.lower())(_str(item), *args, 
**kwargs)
+
+
+def log_list(iterator):
+    any_logs = False
+    for item in iterator:
+        log(item)
+        any_logs = True
+    return any_logs

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/cli/logger.py
----------------------------------------------------------------------
diff --git a/aria/cli/logger.py b/aria/cli/logger.py
index 1ffa918..c240f02 100644
--- a/aria/cli/logger.py
+++ b/aria/cli/logger.py
@@ -112,3 +112,21 @@ class Logging(object):
             log.setLevel(level)
 
         dictconfig.dictConfig(logger_dict)
+
+
+class ModelLogIterator(object):
+
+    def __init__(self, model_storage, execution_id, filters=None, sort=None):
+        self._last_visited_id = 0
+        self._model_storage = model_storage
+        self._execution_id = execution_id
+        self._additional_filters = filters or {}
+        self._sort = sort or {}
+
+    def __iter__(self):
+        filters = dict(execution_fk=self._execution_id, 
id=dict(gt=self._last_visited_id))
+        filters.update(self._additional_filters)
+
+        for log in self._model_storage.log.iter(filters=filters, 
sort=self._sort):
+            self._last_visited_id = log.id
+            yield log

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index dd54264..d6a06d0 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -177,10 +177,12 @@ class _SQLAlchemyHandler(logging.Handler):
         log = self._cls(
             execution_fk=self._execution_id,
             task_fk=record.task_id,
-            actor=record.prefix,
             level=record.levelname,
             msg=str(record.msg),
             created_at=created_at,
+
+            # Not mandatory.
+            traceback=getattr(record, 'traceback', None)
         )
         self._session.add(log)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 01ab2e8..b9a75e9 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -413,7 +413,9 @@ class LogBase(ModelMixin):
     level = Column(String)
     msg = Column(String)
     created_at = Column(DateTime, index=True)
-    actor = Column(String)
+
+    # In case of failed execution
+    traceback = Column(Text)
 
     # region foreign keys
 
@@ -427,6 +429,9 @@ class LogBase(ModelMixin):
 
     # endregion
 
+    def __str__(self):
+        return self.msg
+
     def __repr__(self):
-        return "<{self.created_at}: [{self.level}] @{self.actor}> 
{msg}".format(
-            self=self, msg=self.msg[:50])
+        name = (self.task.actor if self.task else self.execution).name
+        return '{name}: {self.msg}'.format(name=name, self=self)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py 
b/aria/orchestrator/context/common.py
index 15843db..64ef9a4 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -38,43 +38,44 @@ class BaseContext(object):
     """
 
     class PrefixedLogger(object):
-        def __init__(self, logger, prefix='', task_id=None):
-            self._logger = logger
-            self._prefix = prefix
+        def __init__(self, base_logger, task_id=None):
+            self._logger = base_logger
             self._task_id = task_id
 
-        def __getattr__(self, item):
-            if item.upper() in logging._levelNames:
-                return partial(getattr(self._logger, item),
-                               extra={'prefix': self._prefix, 'task_id': 
self._task_id})
+        def __getattr__(self, attribute):
+            if attribute.upper() in logging._levelNames:
+                return partial(self._logger_with_task_id, _level=attribute)
             else:
-                return getattr(self._logger, item)
-
-    def __init__(
-            self,
-            name,
-            service_id,
-            execution_id,
-            model_storage,
-            resource_storage,
-            workdir=None,
-            **kwargs):
+                return getattr(self._logger, attribute)
+
+        def _logger_with_task_id(self, *args, **kwargs):
+            level = kwargs.pop('_level')
+            kwargs.setdefault('extra', {})['task_id'] = self._task_id
+            return getattr(self._logger, level)(*args, **kwargs)
+
+    def __init__(self,
+                 name,
+                 service_id,
+                 model_storage,
+                 resource_storage,
+                 execution_id,
+                 workdir=None,
+                 **kwargs):
         super(BaseContext, self).__init__(**kwargs)
         self._name = name
         self._id = generate_uuid(variant='uuid')
         self._model = model_storage
         self._resource = resource_storage
         self._service_id = service_id
-        self._execution_id = execution_id
         self._workdir = workdir
+        self._execution_id = execution_id
         self.logger = None
 
     def _register_logger(self, level=None, task_id=None):
         self.logger = self.PrefixedLogger(
-            logging.getLogger(aria_logger.TASK_LOGGER_NAME), self.logging_id, 
task_id=task_id)
+            logging.getLogger(aria_logger.TASK_LOGGER_NAME), task_id=task_id)
         self.logger.setLevel(level or logging.DEBUG)
         if not self.logger.handlers:
-            self.logger.addHandler(aria_logger.create_console_log_handler())
             self.logger.addHandler(self._get_sqla_handler())
 
     def _get_sqla_handler(self):
@@ -104,10 +105,6 @@ class BaseContext(object):
                 self.logger.removeHandler(handler)
 
     @property
-    def logging_id(self):
-        raise NotImplementedError
-
-    @property
     def model(self):
         """
         Access to the model storage

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py 
b/aria/orchestrator/context/operation.py
index c7d8246..c383958 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -29,20 +29,8 @@ class BaseOperationContext(BaseContext):
     Context object used during operation creation and execution
     """
 
-    def __init__(self,
-                 name,
-                 model_storage,
-                 resource_storage,
-                 service_id,
-                 task_id,
-                 actor_id,
-                 **kwargs):
-        super(BaseOperationContext, self).__init__(
-            name=name,
-            model_storage=model_storage,
-            resource_storage=resource_storage,
-            service_id=service_id,
-            **kwargs)
+    def __init__(self, task_id, actor_id, **kwargs):
+        super(BaseOperationContext, self).__init__(**kwargs)
         self._task_id = task_id
         self._actor_id = actor_id
         self._thread_local = threading.local()
@@ -55,10 +43,6 @@ class BaseOperationContext(BaseContext):
         return '{name}({0})'.format(details, name=self.name)
 
     @property
-    def logging_id(self):
-        raise NotImplementedError
-
-    @property
     def task(self):
         """
         The task in the model storage
@@ -119,10 +103,6 @@ class NodeOperationContext(BaseOperationContext):
     """
 
     @property
-    def logging_id(self):
-        return self.node.name or self.node.id
-
-    @property
     def node_template(self):
         """
         the node of the current operation
@@ -145,11 +125,6 @@ class RelationshipOperationContext(BaseOperationContext):
     """
 
     @property
-    def logging_id(self):
-        return '{0}->{1}'.format(self.source_node.name or self.source_node.id,
-                                 self.target_node.name or self.target_node.id)
-
-    @property
     def source_node_template(self):
         """
         The source node

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py 
b/aria/orchestrator/context/workflow.py
index 667d22f..920b237 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -50,8 +50,8 @@ class WorkflowContext(BaseContext):
                 name=self.__class__.__name__, self=self))
 
     @property
-    def logging_id(self):
-        return '{0}[{1}]'.format(self._workflow_name, self._execution_id)
+    def workflow_name(self):
+        return self._workflow_name
 
     @property
     def execution(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/execution_plugin/instantiation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/instantiation.py 
b/aria/orchestrator/execution_plugin/instantiation.py
index 7627a38..db33cf3 100644
--- a/aria/orchestrator/execution_plugin/instantiation.py
+++ b/aria/orchestrator/execution_plugin/instantiation.py
@@ -73,8 +73,8 @@ def _configure_remote(operation, configuration, arguments):
     if ('password' not in ssh) and ('key' not in ssh) and ('key_filename' not 
in ssh):
         ssh['password'] = default_password
 
-    arguments['use_sudo'] = ssh.get('use_sudo')
-    arguments['hide_output'] = ssh.get('hide_output')
+    arguments['use_sudo'] = ssh.get('use_sudo', False)
+    arguments['hide_output'] = ssh.get('hide_output', dict(everything=False))
     arguments['fabric_env'] = {}
     if 'warn_only' in ssh:
         arguments['fabric_env']['warn_only'] = ssh['warn_only']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py 
b/aria/orchestrator/workflow_runner.py
index 1ea60a1..8f25cce 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -90,8 +90,12 @@ class WorkflowRunner(object):
             tasks_graph=self._tasks_graph)
 
     @property
+    def execution_id(self):
+        return self._execution_id
+
+    @property
     def execution(self):
-        return self._model_storage.execution.get(self._execution_id)
+        return self._model_storage.execution.get(self.execution_id)
 
     @property
     def service(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index 155d0ee..5e9b496 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -40,6 +40,7 @@ class Engine(logger.LoggerMixin):
 
     def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
         super(Engine, self).__init__(**kwargs)
+        self.logger.addHandler(logger.NullHandler())
         self._workflow_context = workflow_context
         self._execution_graph = networkx.DiGraph()
         self._executor = executor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py 
b/aria/orchestrator/workflows/events_logging.py
index e831bfe..7d15c81 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -22,45 +22,56 @@ Implementation of logger handlers for workflow and 
operation events.
 """
 
 from .. import events
+from ... import modeling
+
+
+def _get_task_name(task):
+    if isinstance(task.actor, 
modeling.model_bases.service_instance.RelationshipBase):
+        return '{source_node.name}->{target_node.name}'.format(
+            source_node=task.actor.source_node, 
target_node=task.actor.target_node)
+    else:
+        return task.actor.name
 
 
 @events.start_task_signal.connect
 def _start_task_handler(task, **kwargs):
-    task.context.logger.debug('Event: Starting task: 
{task.name}'.format(task=task))
+    task.context.logger.info('{name} 
{task.interface_name}.{task.operation_name} started...'
+                             .format(name=_get_task_name(task), task=task))
 
 
 @events.on_success_task_signal.connect
 def _success_task_handler(task, **kwargs):
-    task.context.logger.debug('Event: Task success: 
{task.name}'.format(task=task))
+    task.context.logger.info('{name} 
{task.interface_name}.{task.operation_name} successful'
+                             .format(name=_get_task_name(task), task=task))
 
 
 @events.on_failure_task_signal.connect
-def _failure_operation_handler(task, exception, **kwargs):
-    error = '{0}: {1}'.format(type(exception).__name__, exception)
-    task.context.logger.error('Event: Task failure: {task.name} 
[{error}]'.format(
-        task=task, error=error))
-
+def _failure_operation_handler(task, traceback, **kwargs):
+    task.context.logger.error(
+        '{name} {task.interface_name}.{task.operation_name} failed'
+        .format(name=_get_task_name(task), task=task), 
extra=dict(traceback=traceback)
+    )
 
 @events.start_workflow_signal.connect
 def _start_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Starting workflow: 
{context.name}'.format(context=context))
+    context.logger.info("Starting '{ctx.workflow_name}' workflow 
execution".format(ctx=context))
 
 
 @events.on_failure_workflow_signal.connect
 def _failure_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow failure: 
{context.name}'.format(context=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution 
failed".format(ctx=context))
 
 
 @events.on_success_workflow_signal.connect
 def _success_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow success: 
{context.name}'.format(context=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution 
succeeded".format(ctx=context))
 
 
 @events.on_cancelled_workflow_signal.connect
 def _cancel_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow cancelled: 
{context.name}'.format(context=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution 
canceled".format(ctx=context))
 
 
 @events.on_cancelling_workflow_signal.connect
 def _cancelling_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow cancelling: 
{context.name}'.format(context=context))
+    context.logger.info("Cancelling '{ctx.workflow_name}' workflow 
execution".format(ctx=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py 
b/aria/orchestrator/workflows/executor/base.py
index 4ae046d..39becef 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -44,8 +44,8 @@ class BaseExecutor(logger.LoggerMixin):
         events.start_task_signal.send(task)
 
     @staticmethod
-    def _task_failed(task, exception):
-        events.on_failure_task_signal.send(task, exception=exception)
+    def _task_failed(task, exception, traceback=None):
+        events.on_failure_task_signal.send(task, exception=exception, 
traceback=traceback)
 
     @staticmethod
     def _task_succeeded(task):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py 
b/aria/orchestrator/workflows/executor/dry.py
index d894b25..e1261bb 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -34,15 +34,19 @@ class DryExecutor(BaseExecutor):
             task.started_at = datetime.utcnow()
             task.status = task.STARTED
 
-        actor_type = type(task.actor).__name__.lower()
-        implementation = '{0} > '.format(task.plugin) if task.plugin else ''
-        implementation += task.implementation
-        inputs = dict(inp.unwrap() for inp in task.inputs.values())
+        if hasattr(task.actor, 'source_node'):
+            name = '{source_node.name}->{target_node.name}'.format(
+                source_node=task.actor.source_node, 
target_node=task.actor.target_node)
+        else:
+            name = task.actor.name
 
         task.context.logger.info(
-            'Executing {actor_type} {task.actor.name} operation 
{task.interface_name} '
-            '{task.operation_name}: {implementation} (Inputs: {inputs})'
-            .format(actor_type=actor_type, task=task, 
implementation=implementation, inputs=inputs))
+            '<dry> {name} {task.interface_name}.{task.operation_name} 
started...'
+            .format(name=name, task=task))
+
+        task.context.logger.info(
+            '<dry> {name} {task.interface_name}.{task.operation_name} 
successful'
+            .format(name=name, task=task))
 
         # updating the task manually instead of calling 
self._task_succeeded(task),
         # to avoid any side effects raising that event might cause

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 851d78e..2378e0a 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -42,13 +42,16 @@ import pickle
 import jsonpickle
 
 import aria
-from aria.extension import process_executor
-from aria.utils import imports
-from aria.utils import exceptions
 from aria.orchestrator.workflows.executor import base
 from aria.storage import instrumentation
+from aria.extension import process_executor
+from aria.utils import (
+    imports,
+    exceptions
+)
 from aria.modeling import types as modeling_types
 
+
 _IS_WIN = os.name == 'nt'
 
 _INT_FMT = 'I'
@@ -233,9 +236,10 @@ class ProcessExecutor(base.BaseExecutor):
         except BaseException as e:
             e.message += 'Task failed due to 
{0}.'.format(request['exception']) + \
                          UPDATE_TRACKED_CHANGES_FAILED_STR
-            self._task_failed(task, exception=e)
+            self._task_failed(
+                task, exception=e, 
traceback=exceptions.get_exception_as_string(*sys.exc_info()))
         else:
-            self._task_failed(task, exception=request['exception'])
+            self._task_failed(task, exception=request['exception'], 
traceback=request['traceback'])
 
     def _handle_apply_tracked_changes_request(self, task_id, request, 
response):
         task = self._tasks[task_id]
@@ -319,6 +323,7 @@ class _Messenger(object):
                 'type': type,
                 'task_id': self.task_id,
                 'exception': exceptions.wrap_if_needed(exception),
+                'traceback': 
exceptions.get_exception_as_string(*sys.exc_info()),
                 'tracked_changes': tracked_changes
             })
             response = _recv_message(sock)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py 
b/aria/orchestrator/workflows/executor/thread.py
index f422592..836b2bf 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -20,7 +20,9 @@ Thread based executor
 import Queue
 import threading
 
-from aria.utils import imports
+import sys
+
+from aria.utils import imports, exceptions
 
 from .base import BaseExecutor
 
@@ -63,7 +65,9 @@ class ThreadExecutor(BaseExecutor):
                     task_func(ctx=task.context, **inputs)
                     self._task_succeeded(task)
                 except BaseException as e:
-                    self._task_failed(task, exception=e)
+                    self._task_failed(task,
+                                      exception=e,
+                                      
traceback=exceptions.get_exception_as_string(*sys.exc_info()))
             # Daemon threads
             except BaseException as e:
                 pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/tests/.pylintrc
----------------------------------------------------------------------
diff --git a/tests/.pylintrc b/tests/.pylintrc
index eead6e8..9795bfc 100644
--- a/tests/.pylintrc
+++ b/tests/.pylintrc
@@ -369,7 +369,7 @@ max-statements=50
 max-parents=7
 
 # Maximum number of attributes for a class (see R0902).
-max-attributes=15
+max-attributes=25
 
 # Minimum number of public methods for a class (see R0903).
 min-public-methods=0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py 
b/tests/orchestrator/workflows/executor/__init__.py
index ae1e83e..c05831a 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -12,3 +12,54 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import uuid
+import logging
+from collections import namedtuple
+from contextlib import contextmanager
+
+from aria.modeling import models
+
+
+class MockTask(object):
+
+    INFINITE_RETRIES = models.Task.INFINITE_RETRIES
+
+    def __init__(self, implementation, inputs=None, plugin=None):
+        self.implementation = self.name = implementation
+        self.plugin_fk = plugin.id if plugin else None
+        self.plugin = plugin or None
+        self.inputs = inputs or {}
+        self.states = []
+        self.exception = None
+        self.id = str(uuid.uuid4())
+        self.logger = logging.getLogger()
+        self.context = MockContext()
+        self.retry_count = 0
+        self.max_attempts = 1
+        self.ignore_failure = False
+        self.interface_name = 'interface_name'
+        self.operation_name = 'operation_name'
+        self.actor = namedtuple('actor', 'name')(name='actor_name')
+        self.model_task = None
+
+        for state in models.Task.STATES:
+            setattr(self, state.upper(), state)
+
+    @contextmanager
+    def _update(self):
+        yield self
+
+
+class MockContext(object):
+
+    def __init__(self):
+        self.logger = logging.getLogger('mock_logger')
+        self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
+        self.serialization_dict = {'context_cls': self.__class__, 'context': 
{}}
+
+    def __getattr__(self, item):
+        return None
+
+    @classmethod
+    def deserialize_from_dict(cls, **kwargs):
+        return cls()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py 
b/tests/orchestrator/workflows/executor/test_executor.py
index a7619de..d4482ae 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -13,9 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-import uuid
-from contextlib import contextmanager
 
 import pytest
 import retrying
@@ -37,14 +34,19 @@ from aria.orchestrator.workflows.executor import (
 )
 
 import tests
+from . import MockTask
+
+
+def _get_implementation(func):
+    return '{module}.{func.__name__}'.format(module=__name__, func=func)
 
 
 def test_execute(executor):
     expected_value = 'value'
-    successful_task = MockTask(mock_successful_task)
-    failing_task = MockTask(mock_failing_task)
-    task_with_inputs = MockTask(mock_task_with_input, inputs={'input': 
models.Parameter.wrap(
-        'input', 'value')})
+    successful_task = MockTask(_get_implementation(mock_successful_task))
+    failing_task = MockTask(_get_implementation(mock_failing_task))
+    task_with_inputs = MockTask(_get_implementation(mock_task_with_input),
+                                inputs={'input': 
models.Parameter.wrap('input', 'value')})
 
     for task in [successful_task, failing_task, task_with_inputs]:
         executor.execute(task)
@@ -81,54 +83,6 @@ class MockException(Exception):
     pass
 
 
-class MockContext(object):
-
-    def __init__(self, *args, **kwargs):
-        self.logger = logging.getLogger()
-        self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
-        self.serialization_dict = {'context_cls': self.__class__, 'context': 
{}}
-
-    def __getattr__(self, item):
-        return None
-
-    @classmethod
-    def deserialize_from_dict(cls, **kwargs):
-        return cls()
-
-
-class MockTask(object):
-
-    INFINITE_RETRIES = models.Task.INFINITE_RETRIES
-
-    def __init__(self, func, inputs=None):
-        self.states = []
-        self.exception = None
-        self.id = str(uuid.uuid4())
-        name = func.__name__
-        implementation = '{module}.{name}'.format(
-            module=__name__,
-            name=name)
-        self.implementation = implementation
-        self.logger = logging.getLogger()
-        self.name = name
-        self.inputs = inputs or {}
-        self.context = MockContext()
-        self.retry_count = 0
-        self.max_attempts = 1
-        self.plugin_fk = None
-        self.ignore_failure = False
-        self.interface_name = 'interface_name'
-        self.operation_name = 'operation_name'
-        self.model_task = None
-
-        for state in models.Task.STATES:
-            setattr(self, state.upper(), state)
-
-    @contextmanager
-    def _update(self):
-        yield self
-
-
 @pytest.fixture(params=[
     (thread.ThreadExecutor, {'pool_size': 1}),
     (thread.ThreadExecutor, {'pool_size': 2}),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/59d5301f/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py 
b/tests/orchestrator/workflows/executor/test_process_executor.py
index 839b9f1..b353518 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -15,13 +15,10 @@
 
 import logging
 import os
-import uuid
 import Queue
-from contextlib import contextmanager
 
 import pytest
 
-from aria.modeling import models as aria_models
 from aria.orchestrator import events
 from aria.utils.plugin import create as create_plugin
 from aria.orchestrator.workflows.executor import process
@@ -33,17 +30,17 @@ from tests.fixtures import (  # pylint: 
disable=unused-import
     plugin_manager,
     fs_model as model
 )
+from . import MockTask
 
 
 class TestProcessExecutor(object):
 
     def test_plugin_execution(self, executor, mock_plugin):
-        task = MockTask(plugin=mock_plugin,
-                        implementation='mock_plugin1.operation')
+        task = MockTask('mock_plugin1.operation', plugin=mock_plugin)
 
         queue = Queue.Queue()
 
-        def handler(_, exception=None):
+        def handler(_, exception=None, **kwargs):
             queue.put(exception)
 
         events.on_success_task_signal.connect(handler)
@@ -100,31 +97,3 @@ class MockContext(object):
     @classmethod
     def deserialize_from_dict(cls, **kwargs):
         return cls()
-
-
-class MockTask(object):
-
-    INFINITE_RETRIES = aria_models.Task.INFINITE_RETRIES
-
-    def __init__(self, plugin, implementation):
-        self.id = str(uuid.uuid4())
-        self.implementation = implementation
-        self.logger = logging.getLogger()
-        self.name = implementation
-        self.inputs = {}
-        self.context = MockContext()
-        self.retry_count = 0
-        self.max_attempts = 1
-        self.plugin_fk = plugin.id
-        self.plugin = plugin
-        self.ignore_failure = False
-        self.interface_name = 'interface_name'
-        self.operation_name = 'operation_name'
-        self.model_task = None
-
-        for state in aria_models.Task.STATES:
-            setattr(self, state.upper(), state)
-
-    @contextmanager
-    def _update(self):
-        yield self


Reply via email to