Repository: incubator-ariatosca Updated Branches: refs/heads/logger_task [created] 167080780
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/16708078 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/16708078 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/16708078 Branch: refs/heads/logger_task Commit: 16708078066f69733c5c2838c658e7977b29ae4e Parents: 9df203e Author: max-orlov <[email protected]> Authored: Wed Apr 5 19:15:04 2017 +0300 Committer: max-orlov <[email protected]> Committed: Wed Apr 5 19:15:04 2017 +0300 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 12 +++- aria/cli/logger.py | 66 +++++++++++++++++++- aria/logger.py | 1 - aria/modeling/orchestration.py | 4 +- aria/orchestrator/context/common.py | 14 +---- aria/orchestrator/context/operation.py | 13 ---- aria/orchestrator/context/workflow.py | 4 +- aria/orchestrator/workflow_runner.py | 6 +- aria/orchestrator/workflows/events_logging.py | 19 +++--- aria/orchestrator/workflows/executor/process.py | 6 +- 10 files changed, 100 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index cc8bf6c..dd62e94 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from aria.cli.logger import LogConsumer from .. import utils from ..table import print_data from ..cli import aria @@ -139,13 +139,19 @@ def start(workflow_name, logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) execution_thread.start() + + log_consumer = LogConsumer(model_storage, workflow_runner.execution_id) try: while execution_thread.is_alive(): - # using join without a timeout blocks and ignores KeyboardInterrupt - execution_thread.join(1) + for log in log_consumer: + logger.log(log) + except KeyboardInterrupt: _cancel_execution(workflow_runner, execution_thread, logger) + for log in log_consumer: + logger.log(log) + # raise any errors from the execution thread (note these are not workflow execution errors) execution_thread.raise_error_if_exists() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/cli/logger.py ---------------------------------------------------------------------- diff --git a/aria/cli/logger.py b/aria/cli/logger.py index 289dbd3..bfafbfa 100644 --- a/aria/cli/logger.py +++ b/aria/cli/logger.py @@ -51,6 +51,42 @@ DEFAULT_LOGGER_CONFIG = { } +class _ModelLogLogger(object): + def __init__(self, logger, level, formats): + self._logger = logger + self._level = level + self._formats = formats + + def log(self, item): + kwargs = dict(item=item) + formats = self._formats[self.level] + if 'created_at' in formats: + kwargs['created_at'] = item.created_at.strftime(formats['created_at']) + if 'level' in formats: + kwargs['level'] = formats['level'].format(item.level) + if 'msg' in formats: + kwargs['msg'] = formats['msg'].format(item.msg) + + if 'actor' in formats and item.task: + kwargs['actor'] = formats['actor'].format(item.task.actor) + if 'execution' in formats: + kwargs['execution'] = formats['execution'].format(item.execution) + + msg = formats['main_msg'].format(**kwargs) + return getattr(self._logger, item.level.lower())(msg) + + @property + def level(self): + return logging.INFO if self._level is NO_VERBOSE else logging.DEBUG + + @level.setter + def level(self, level): + self._level = level + + def __getattr__(self, item): + return getattr(self._logger, item) + + class Logging(object): def __init__(self, config): @@ -58,7 +94,18 @@ class Logging(object): self._verbosity_level = NO_VERBOSE self._all_loggers = [] self._configure_loggers(config) - self._lgr = logging.getLogger('aria.cli.main') + + self._lgr = _ModelLogLogger( + logging.getLogger('aria.cli.main'), + self._verbosity_level, + {logging.INFO: { + 'main_msg': '{item.msg}', + }, + logging.DEBUG: { + 'main_msg': '{created_at} | {item.level[0]} | {item.msg}', + 'created_at': '%H:%M:%S' + } + }) @property def logger(self): @@ -77,7 +124,7 @@ class Logging(object): @verbosity_level.setter def verbosity_level(self, level): - self._verbosity_level = level + self._verbosity_level = self._lgr.level = level if self.is_high_verbose_level(): for logger_name in self._all_loggers: logging.getLogger(logger_name).setLevel(logging.DEBUG) @@ -111,3 +158,18 @@ class Logging(object): self._all_loggers.append(logger_name) logging.config.dictConfig(logger_dict) + + +class LogConsumer(object): + + def __init__(self, model_storage, execution_id): + self._last_visited_id = 0 + self._model_storage = model_storage + self._execution_id = execution_id + + def __iter__(self): + + for log in self._model_storage.log.iter(filters=dict( + execution_fk=self._execution_id, id=dict(gt=self._last_visited_id) )): + self._last_visited_id = log.id + yield log http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index e3039f5..26ba1e3 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -165,7 +165,6 @@ class _SQLAlchemyHandler(logging.Handler): log = self._cls( execution_fk=self._execution_id, task_fk=record.task_id, - actor=record.prefix, level=record.levelname, msg=str(record.msg), created_at=created_at, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 3ad6b58..bf7012f 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -394,7 +394,6 @@ class LogBase(ModelMixin): level = Column(String) msg = Column(String) created_at = Column(DateTime, index=True) - actor = Column(String) # region foreign keys @@ -409,5 +408,4 @@ class LogBase(ModelMixin): # endregion def __repr__(self): - return "<{self.created_at}: [{self.level}] @{self.actor}> {msg}".format( - self=self, msg=self.msg[:50]) + return "{msg}".format(msg=self.msg) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 2e33d77..452f5d0 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -38,15 +38,13 @@ class BaseContext(object): """ class PrefixedLogger(object): - def __init__(self, logger, prefix='', task_id=None): + def __init__(self, logger, task_id=None): self._logger = logger - self._prefix = prefix self._task_id = task_id def __getattr__(self, item): if item.upper() in logging._levelNames: - return partial(getattr(self._logger, item), - extra={'prefix': self._prefix, 'task_id': self._task_id}) + return partial(getattr(self._logger, item), extra=dict(task_id=self._task_id)) else: return getattr(self._logger, item) @@ -68,11 +66,9 @@ class BaseContext(object): self.logger = None def _register_logger(self, level=None, task_id=None): - self.logger = self.PrefixedLogger( - logging.getLogger('aria.executions.task'), self.logging_id, task_id=task_id) + self.logger = self.PrefixedLogger(logging.getLogger('aria.orchestrator'), task_id=task_id) self.logger.setLevel(level or logging.DEBUG) if not self.logger.handlers: - self.logger.addHandler(aria_logger.create_console_log_handler()) self.logger.addHandler(self._get_sqla_handler()) def _get_sqla_handler(self): @@ -102,10 +98,6 @@ class BaseContext(object): self.logger.removeHandler(handler) @property - def logging_id(self): - raise NotImplementedError - - @property def model(self): """ Access to the model storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index cbd186c..6aed377 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -57,10 +57,6 @@ class BaseOperationContext(BaseContext): return '{name}({0})'.format(details, name=self.name) @property - def logging_id(self): - raise NotImplementedError - - @property def task(self): """ The task in the model storage @@ -121,10 +117,6 @@ class NodeOperationContext(BaseOperationContext): """ @property - def logging_id(self): - return self.node.name or self.node.id - - @property def node_template(self): """ the node of the current operation @@ -147,11 +139,6 @@ class RelationshipOperationContext(BaseOperationContext): """ @property - def logging_id(self): - return '{0}->{1}'.format(self.source_node.name or self.source_node.id, - self.target_node.name or self.target_node.id) - - @property def source_node_template(self): """ The source node http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index ad4a2ff..9d8c0d1 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -52,8 +52,8 @@ class WorkflowContext(BaseContext): name=self.__class__.__name__, self=self)) @property - def logging_id(self): - return '{0}[{1}]'.format(self._workflow_name, self._execution_id) + def workflow_name(self): + return self._workflow_name @property def execution(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 65c0d4c..eff5347 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -90,8 +90,12 @@ class WorkflowRunner(object): tasks_graph=self._tasks_graph) @property + def execution_id(self): + return self._execution_id + + @property def execution(self): - return self._model_storage.execution.get(self._execution_id) + return self._model_storage.execution.get(self.execution_id) @property def service(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index e831bfe..07981e9 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -26,16 +26,21 @@ from .. import events @events.start_task_signal.connect def _start_task_handler(task, **kwargs): - task.context.logger.debug('Event: Starting task: {task.name}'.format(task=task)) + task.context.logger.debug('{actor.name} {task.interface_name}.{task.operation_name} started...' + .format(actor=task.actor, task=task)) @events.on_success_task_signal.connect def _success_task_handler(task, **kwargs): - task.context.logger.debug('Event: Task success: {task.name}'.format(task=task)) + task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} successful' + .format(actor=task.actor, task=task)) @events.on_failure_task_signal.connect def _failure_operation_handler(task, exception, **kwargs): + # todo: add full support for exceptions and errors logging + task.context.logger.info('{actor.name} {task.interface_name}.{task.operation_name} failed' + .format(actor=task.actor, task=task)) error = '{0}: {1}'.format(type(exception).__name__, exception) task.context.logger.error('Event: Task failure: {task.name} [{error}]'.format( task=task, error=error)) @@ -43,24 +48,24 @@ def _failure_operation_handler(task, exception, **kwargs): @events.start_workflow_signal.connect def _start_workflow_handler(context, **kwargs): - context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context)) + context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context)) @events.on_failure_workflow_signal.connect def _failure_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context)) @events.on_success_workflow_signal.connect def _success_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)) @events.on_cancelled_workflow_signal.connect def _cancel_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) + context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context)) @events.on_cancelling_workflow_signal.connect def _cancelling_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context)) + context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/16708078/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index dc369ab..83d8b55 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -148,8 +148,10 @@ class ProcessExecutor(base.BaseExecutor): def _create_arguments_dict(self, task): return { 'task_id': task.id, - 'implementation': task.implementation, - 'operation_inputs': Parameter.unwrap_dict(task.inputs), + # 'implementation': task.implementation, + 'implementation': 'aria.orchestrator.execution_plugin.operations.run_script_locally', + # 'operation_inputs': Parameter.unwrap_dict(task.inputs), + 'operation_inputs': dict(script_path=task.implementation), 'port': self._server_port, 'context': task.context.serialization_dict, }
