Repository: incubator-ariatosca Updated Branches: refs/heads/stub_task_branch [created] 222bef595
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/282fcbf9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/282fcbf9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/282fcbf9 Branch: refs/heads/stub_task_branch Commit: 282fcbf9b51e05fe1d1fef2a0835e9c4654dea3f Parents: 1f3e7ff Author: max-orlov <[email protected]> Authored: Sun Apr 30 16:05:27 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Apr 30 16:05:27 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/events.py | 8 ++++ aria/orchestrator/workflows/core/engine.py | 6 ++- aria/orchestrator/workflows/core/task.py | 15 +++++++- aria/orchestrator/workflows/core/translation.py | 4 +- aria/orchestrator/workflows/events_logging.py | 12 ++++-- aria/orchestrator/workflows/executor/base.py | 4 ++ aria/orchestrator/workflows/executor/dry.py | 39 +++++++++++++------- tests/end2end/test_hello_world.py | 3 +- 8 files changed, 69 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/events.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py index a1c4922..812040b 100644 --- a/aria/orchestrator/events.py +++ b/aria/orchestrator/events.py @@ -28,6 +28,14 @@ start_task_signal = signal('start_task_signal') on_success_task_signal = signal('success_task_signal') on_failure_task_signal = signal('failure_task_signal') +# node state signals: +# Note that each signal corresponds with a task. The basic start_task_signal also changes the state +# of the node on which it runs. (so does the on_success_task_signal and the on_failure_task_signal) +start_node_signal = signal('start_task_signal') +on_success_node_signal = signal('success_task_signal') +on_failure_node_signal = signal('failure_task_signal') + + # workflow engine workflow signals: start_workflow_signal = signal('start_workflow_signal') on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 155d0ee..97c4999 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -44,7 +44,8 @@ class Engine(logger.LoggerMixin): self._execution_graph = networkx.DiGraph() self._executor = executor translation.build_execution_graph(task_graph=tasks_graph, - execution_graph=self._execution_graph) + execution_graph=self._execution_graph, + executor=self._executor) def execute(self): """ @@ -110,11 +111,12 @@ class Engine(logger.LoggerMixin): yield task def _handle_executable_task(self, task): + # import pydevd; pydevd.settrace('localhost', suspend=False) if isinstance(task, engine_task.StubTask): task.status = models.Task.SUCCESS else: events.sent_task_signal.send(task) - self._executor.execute(task) + task.execute() def _handle_ended_tasks(self, task): if task.status == models.Task.FAILED and not task.ignore_failure: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 2b26152..548bf47 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -28,6 +28,7 @@ from functools import ( from ....modeling import models from ...context import operation as operation_context from .. import exceptions +from ..executor import dry def _locked(func=None): @@ -47,9 +48,13 @@ class BaseTask(object): Base class for Task objects """ - def __init__(self, id, *args, **kwargs): + def __init__(self, id, executor, *args, **kwargs): super(BaseTask, self).__init__(*args, **kwargs) self._id = id + self._executor = executor + + def execute(self): + self._executor.execute(self) @property def id(self): @@ -65,7 +70,7 @@ class StubTask(BaseTask): """ def __init__(self, *args, **kwargs): - super(StubTask, self).__init__(*args, **kwargs) + super(StubTask, self).__init__(executor=dry.StubExecutor(), *args, **kwargs) self.status = models.Task.PENDING self.due_at = datetime.utcnow() @@ -128,6 +133,12 @@ class OperationTask(BaseTask): raise RuntimeError('No operation context could be created for {actor.model_cls}' .format(actor=api_task.actor)) + # TODO: this executor should be put into the task (if no executor was setup in the + # operation) + # executor = '{module}.{name}'.format(module=self._executor.__module__, + # name=self._executor.__class__.__name__ + # ) + task_model = create_task_model( name=api_task.name, implementation=api_task.implementation, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py index b6cbdad..1ae59a3 100644 --- a/aria/orchestrator/workflows/core/translation.py +++ b/aria/orchestrator/workflows/core/translation.py @@ -24,6 +24,7 @@ from . import task as core_task def build_execution_graph( task_graph, execution_graph, + executor, start_cls=core_task.StartWorkflowTask, end_cls=core_task.EndWorkflowTask, depends_on=()): @@ -49,13 +50,14 @@ def build_execution_graph( if isinstance(api_task, api.task.OperationTask): # Add the task an the dependencies - operation_task = core_task.OperationTask(api_task) + operation_task = core_task.OperationTask(api_task, executor=executor) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) elif isinstance(api_task, api.task.WorkflowTask): # Build the graph recursively while adding start and end markers build_execution_graph( task_graph=api_task, execution_graph=execution_graph, + executor=executor, start_cls=core_task.StartSubWorkflowTask, end_cls=core_task.EndSubWorkflowTask, depends_on=operation_dependencies http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 3ffe18b..fa993d0 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -34,19 +34,25 @@ def _get_task_name(task): @events.start_task_signal.connect -def _start_task_handler(task, **kwargs): +def _start_task_handler(task, skip_logging=False, **kwargs): + if skip_logging: + return task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...' .format(name=_get_task_name(task), task=task)) @events.on_success_task_signal.connect -def _success_task_handler(task, **kwargs): +def _success_task_handler(task, skip_logging=False, **kwargs): + if skip_logging: + return task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful' .format(name=_get_task_name(task), task=task)) @events.on_failure_task_signal.connect -def _failure_operation_handler(task, traceback, **kwargs): +def _failure_operation_handler(task, traceback, skip_logging=False, **kwargs): + if skip_logging: + return task.context.logger.error( '{name} {task.interface_name}.{task.operation_name} failed' .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 39becef..f11a6b7 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -40,6 +40,10 @@ class BaseExecutor(logger.LoggerMixin): pass @staticmethod + def _task_sent(task): + events.sent_task_signal.send(task) + + @staticmethod def _task_started(task): events.start_task_signal.send(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index e1261bb..5be8015 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -19,21 +19,40 @@ Dry executor from datetime import datetime +from aria.orchestrator import events from .base import BaseExecutor -class DryExecutor(BaseExecutor): +# TODO: the name of this module should definitely change + +class StubExecutor(BaseExecutor): + + @staticmethod + def _task_sent(*args, **kwargs): + pass + + @staticmethod + def _task_started(task): + events.start_task_signal.send(task, skip_logging=True) + + @staticmethod + def _task_succeeded(task): + events.on_success_task_signal.send(task, skip_logging=True) + + @staticmethod + def _task_failed(*args, **kwargs): + pass + + def execute(self, task): + pass + + +class DryExecutor(StubExecutor): """ Executor which dry runs tasks - prints task information without causing any side effects """ def execute(self, task): - # updating the task manually instead of calling self._task_started(task), - # to avoid any side effects raising that event might cause - with task._update(): - task.started_at = datetime.utcnow() - task.status = task.STARTED - if hasattr(task.actor, 'source_node'): name = '{source_node.name}->{target_node.name}'.format( source_node=task.actor.source_node, target_node=task.actor.target_node) @@ -47,9 +66,3 @@ class DryExecutor(BaseExecutor): task.context.logger.info( '<dry> {name} {task.interface_name}.{task.operation_name} successful' .format(name=name, task=task)) - - # updating the task manually instead of calling self._task_succeeded(task), - # to avoid any side effects raising that event might cause - with task._update(): - task.ended_at = datetime.utcnow() - task.status = task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/tests/end2end/test_hello_world.py ---------------------------------------------------------------------- diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py index 09e5d06..8895593 100644 --- a/tests/end2end/test_hello_world.py +++ b/tests/end2end/test_hello_world.py @@ -30,7 +30,8 @@ def test_hello_world(testenv): # Even if some assertions failed, attempt to execute uninstall so the # webserver process doesn't stay up once the test is finished # TODO: remove force_service_delete=True - testenv.uninstall_service(force_service_delete=True) + pass + # testenv.uninstall_service(force_service_delete=True) _verify_webserver_down('http://localhost:9090') testenv.verify_clean_storage()
