Repository: incubator-ariatosca Updated Branches: refs/heads/stub_task_branch 9bdc64537 -> 55366e64b (forced update)
wip2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/55366e64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/55366e64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/55366e64 Branch: refs/heads/stub_task_branch Commit: 55366e64b69795363e897a6bd185f12ec14990ea Parents: 282fcbf Author: max-orlov <ma...@gigaspaces.com> Authored: Sun Apr 30 19:54:12 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Sun Apr 30 20:11:04 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/events.py | 8 ---- aria/orchestrator/workflows/api/task.py | 34 ++++++++------- aria/orchestrator/workflows/builtin/utils.py | 38 ++++++++-------- aria/orchestrator/workflows/core/engine.py | 10 ++--- .../workflows/core/events_handler.py | 3 ++ aria/orchestrator/workflows/core/task.py | 46 +++++++++++++------- aria/orchestrator/workflows/core/translation.py | 13 +++--- aria/orchestrator/workflows/executor/dry.py | 32 +++++--------- tests/end2end/test_hello_world.py | 5 +-- .../test_task_graph_into_execution_graph.py | 4 +- 10 files changed, 98 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/events.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py index 812040b..a1c4922 100644 --- a/aria/orchestrator/events.py +++ b/aria/orchestrator/events.py @@ -28,14 +28,6 @@ start_task_signal = signal('start_task_signal') on_success_task_signal = signal('success_task_signal') on_failure_task_signal = signal('failure_task_signal') -# node state signals: -# Note that each signal corresponds with a task. The basic start_task_signal also changes the state -# of the node on which it runs. (so does the on_success_task_signal and the on_failure_task_signal) -start_node_signal = signal('start_task_signal') -on_success_node_signal = signal('success_task_signal') -on_failure_node_signal = signal('failure_task_signal') - - # workflow engine workflow signals: start_workflow_signal = signal('start_workflow_signal') on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 82c40c3..15397c3 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -66,7 +66,8 @@ class OperationTask(BaseTask): inputs=None, max_attempts=None, retry_interval=None, - ignore_failure=None): + ignore_failure=None, + is_stub=False): """ Do not call this constructor directly. Instead, use :meth:`for_node` or :meth:`for_relationship`. @@ -87,15 +88,18 @@ class OperationTask(BaseTask): if ignore_failure is None else ignore_failure) self.interface_name = interface_name self.operation_name = operation_name + self.name = OperationTask.NAME_FORMAT.format(type=actor_type, + name=actor.name, + interface=self.interface_name, + operation=self.operation_name) + self.is_stub = is_stub + if self.is_stub: + return operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] self.plugin = operation.plugin self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs) self.implementation = operation.implementation - self.name = OperationTask.NAME_FORMAT.format(type=actor_type, - name=actor.name, - interface=self.interface_name, - operation=self.operation_name) def __repr__(self): return self.name @@ -108,7 +112,8 @@ class OperationTask(BaseTask): max_attempts=None, retry_interval=None, ignore_failure=None, - inputs=None): + inputs=None, + is_stub=False): """ Creates an operation on a node. @@ -132,7 +137,9 @@ class OperationTask(BaseTask): max_attempts=max_attempts, retry_interval=retry_interval, ignore_failure=ignore_failure, - inputs=inputs) + inputs=inputs, + is_stub=is_stub + ) @classmethod def for_relationship(cls, @@ -142,7 +149,8 @@ class OperationTask(BaseTask): max_attempts=None, retry_interval=None, ignore_failure=None, - inputs=None): + inputs=None, + is_stub=False): """ Creates an operation on a relationship edge. @@ -166,7 +174,9 @@ class OperationTask(BaseTask): max_attempts=max_attempts, retry_interval=retry_interval, ignore_failure=ignore_failure, - inputs=inputs) + inputs=inputs, + is_stub=is_stub + ) class WorkflowTask(BaseTask): @@ -197,9 +207,3 @@ class WorkflowTask(BaseTask): return getattr(self._graph, item) except AttributeError: return super(WorkflowTask, self).__getattribute__(item) - - -class StubTask(BaseTask): - """ - Enables creating empty tasks. - """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/builtin/utils.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py index 2254d13..e649006 100644 --- a/aria/orchestrator/workflows/builtin/utils.py +++ b/aria/orchestrator/workflows/builtin/utils.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ..api.task import OperationTask, StubTask +from ..api.task import OperationTask from .. import exceptions @@ -23,12 +23,10 @@ def create_node_task(node, interface_name, operation_name, **kwargs): """ try: - if _is_empty_task(node, interface_name, operation_name): - return StubTask() - return OperationTask.for_node(node=node, interface_name=interface_name, operation_name=operation_name, + is_stub=_is_empty_task(node, interface_name, operation_name), **kwargs) except exceptions.OperationNotFoundException: # We will skip nodes which do not have the operation @@ -71,29 +69,29 @@ def relationship_tasks(relationship, interface_name, source_operation_name=None, operations = [] if source_operation_name: try: - if _is_empty_task(relationship, interface_name, source_operation_name): - operations.append(StubTask()) - else: - operations.append( - OperationTask.for_relationship(relationship=relationship, - interface_name=interface_name, - operation_name=source_operation_name, - **kwargs) + operations.append( + OperationTask.for_relationship( + relationship=relationship, + interface_name=interface_name, + operation_name=source_operation_name, + is_stub=_is_empty_task(relationship, interface_name, source_operation_name), + **kwargs ) + ) except exceptions.OperationNotFoundException: # We will skip relationships which do not have the operation pass if target_operation_name: try: - if _is_empty_task(relationship, interface_name, target_operation_name): - operations.append(StubTask()) - else: - operations.append( - OperationTask.for_relationship(relationship=relationship, - interface_name=interface_name, - operation_name=target_operation_name, - **kwargs) + operations.append( + OperationTask.for_relationship( + relationship=relationship, + interface_name=interface_name, + operation_name=target_operation_name, + is_stub=_is_empty_task(relationship, interface_name, target_operation_name), + **kwargs ) + ) except exceptions.OperationNotFoundException: # We will skip relationships which do not have the operation pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 97c4999..51531f1 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -110,13 +110,11 @@ class Engine(logger.LoggerMixin): self._workflow_context.model.task.refresh(task.model_task) yield task - def _handle_executable_task(self, task): - # import pydevd; pydevd.settrace('localhost', suspend=False) - if isinstance(task, engine_task.StubTask): - task.status = models.Task.SUCCESS - else: + @staticmethod + def _handle_executable_task(task): + if not isinstance(task, engine_task.MarkerTaskBase): events.sent_task_signal.send(task) - task.execute() + task.execute() def _handle_ended_tasks(self, task): if task.status == models.Task.FAILED and not task.ignore_failure: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index b43082b..83b79d5 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -41,6 +41,9 @@ def _task_started(task, *args, **kwargs): task.started_at = datetime.utcnow() task.status = task.STARTED +@events.start_task_signal.connect +def _node_task_started(task, *args, **kwargs): + with task._update(): _update_node_state_if_necessary(task, is_transitional=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 548bf47..4a0b862 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -64,15 +64,17 @@ class BaseTask(object): return self._id -class StubTask(BaseTask): +class MarkerTaskBase(BaseTask): """ - Base stub task for all tasks that don't actually run anything + Base stub task for marker user tasks that only mark the start/end of a workflow + or sub-workflow """ - def __init__(self, *args, **kwargs): - super(StubTask, self).__init__(executor=dry.StubExecutor(), *args, **kwargs) + super(MarkerTaskBase, self).__init__(executor=dry.MarkerExecutor(), *args, **kwargs) self.status = models.Task.PENDING self.due_at = datetime.utcnow() + self.started_at = None + self.ended_at = None def has_ended(self): return self.status in (models.Task.SUCCESS, models.Task.FAILED) @@ -80,29 +82,37 @@ class StubTask(BaseTask): def is_waiting(self): return self.status in (models.Task.PENDING, models.Task.RETRYING) + def end(self): + self.ended_at = datetime.utcnow() + self.status = models.Task.SUCCESS + + def start(self): + self.started_at = datetime.utcnow() + self.status = models.Task.STARTED + -class StartWorkflowTask(StubTask): +class StartWorkflowTask(MarkerTaskBase): """ Task marking a workflow start """ pass -class EndWorkflowTask(StubTask): +class EndWorkflowTask(MarkerTaskBase): """ Task marking a workflow end """ pass -class StartSubWorkflowTask(StubTask): +class StartSubWorkflowTask(MarkerTaskBase): """ Task marking a subworkflow start """ pass -class EndSubWorkflowTask(StubTask): +class EndSubWorkflowTask(MarkerTaskBase): """ Task marking a subworkflow end """ @@ -113,15 +123,18 @@ class OperationTask(BaseTask): """ Operation task """ + def __init__(self, api_task, executor=None, *args, **kwargs): + # If no executor is provided, we defer that this is a stub task which does not need to be + # executed. + super(OperationTask, self).__init__( + id=api_task.id, executor=executor or dry.StubExecutor(), *args, **kwargs) - def __init__(self, api_task, *args, **kwargs): - super(OperationTask, self).__init__(id=api_task.id, **kwargs) self._workflow_context = api_task._workflow_context self.interface_name = api_task.interface_name self.operation_name = api_task.operation_name model_storage = api_task._workflow_context.model - plugin = api_task.plugin + # This currently signal that this is a stub task base_task_model = model_storage.task.model_cls if isinstance(api_task.actor, models.Node): context_cls = operation_context.NodeOperationContext @@ -141,15 +154,18 @@ class OperationTask(BaseTask): task_model = create_task_model( name=api_task.name, - implementation=api_task.implementation, actor=api_task.actor, - inputs=api_task.inputs, status=base_task_model.PENDING, max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, ignore_failure=api_task.ignore_failure, - plugin=plugin, - execution=self._workflow_context.execution + execution=self._workflow_context.execution, + + # Only non-stub tasks have these fields + plugin=getattr(api_task, 'plugin', None), + implementation=getattr(api_task, 'implementation', None), + inputs=getattr(api_task, 'inputs', {}), + ) self._workflow_context.model.task.put(task_model) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py index 1ae59a3..487d44d 100644 --- a/aria/orchestrator/workflows/core/translation.py +++ b/aria/orchestrator/workflows/core/translation.py @@ -49,9 +49,12 @@ def build_execution_graph( default=[start_task]) if isinstance(api_task, api.task.OperationTask): - # Add the task an the dependencies - operation_task = core_task.OperationTask(api_task, executor=executor) + if api_task.is_stub: + operation_task = core_task.OperationTask(api_task) + else: + operation_task = core_task.OperationTask(api_task, executor=executor) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) + elif isinstance(api_task, api.task.WorkflowTask): # Build the graph recursively while adding start and end markers build_execution_graph( @@ -62,9 +65,6 @@ def build_execution_graph( end_cls=core_task.EndSubWorkflowTask, depends_on=operation_dependencies ) - elif isinstance(api_task, api.task.StubTask): - stub_task = core_task.StubTask(id=api_task.id) - _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies) else: raise RuntimeError('Undefined state') @@ -88,8 +88,7 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): Returns task list from dependencies. """ return [execution_graph.node[dependency.id - if isinstance(dependency, (api.task.OperationTask, - api.task.StubTask)) + if isinstance(dependency, api.task.OperationTask) else _end_graph_suffix(dependency.id)]['task'] for dependency in dependencies] or default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index 5be8015..55d8f98 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -17,42 +17,32 @@ Dry executor """ -from datetime import datetime - from aria.orchestrator import events from .base import BaseExecutor # TODO: the name of this module should definitely change -class StubExecutor(BaseExecutor): - - @staticmethod - def _task_sent(*args, **kwargs): - pass - - @staticmethod - def _task_started(task): - events.start_task_signal.send(task, skip_logging=True) - - @staticmethod - def _task_succeeded(task): - events.on_success_task_signal.send(task, skip_logging=True) +class MarkerExecutor(BaseExecutor): + def execute(self, task): + task.start() + task.end() - @staticmethod - def _task_failed(*args, **kwargs): - pass +class StubExecutor(BaseExecutor): def execute(self, task): - pass + events.start_task_signal.send(task) + events.on_success_task_signal.send(task) -class DryExecutor(StubExecutor): +class DryExecutor(BaseExecutor): """ Executor which dry runs tasks - prints task information without causing any side effects """ def execute(self, task): + events.start_task_signal.send(task, skip_logging=True) + if hasattr(task.actor, 'source_node'): name = '{source_node.name}->{target_node.name}'.format( source_node=task.actor.source_node, target_node=task.actor.target_node) @@ -66,3 +56,5 @@ class DryExecutor(StubExecutor): task.context.logger.info( '<dry> {name} {task.interface_name}.{task.operation_name} successful' .format(name=name, task=task)) + + events.on_success_task_signal.send(task, skip_logging=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/tests/end2end/test_hello_world.py ---------------------------------------------------------------------- diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py index 8895593..dc8a2a3 100644 --- a/tests/end2end/test_hello_world.py +++ b/tests/end2end/test_hello_world.py @@ -30,8 +30,7 @@ def test_hello_world(testenv): # Even if some assertions failed, attempt to execute uninstall so the # webserver process doesn't stay up once the test is finished # TODO: remove force_service_delete=True - pass - # testenv.uninstall_service(force_service_delete=True) + testenv.uninstall_service(force_service_delete=True) _verify_webserver_down('http://localhost:9090') testenv.verify_clean_storage() @@ -58,5 +57,5 @@ def _verify_deployed_service_in_storage(service_name, model_storage): assert service.name == service_name assert len(service.executions) == 1 assert len(service.nodes) == 2 - # TODO: validate node states + assert all(node.state == node.STARTED for node in service.nodes.values()) assert len(service.executions[0].logs) > 0 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 514bce9..16cb47d 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -17,6 +17,7 @@ from networkx import topological_sort, DiGraph from aria.orchestrator import context from aria.orchestrator.workflows import api, core +from aria.orchestrator.workflows.executor import dry from tests import mock from tests import storage @@ -63,7 +64,8 @@ def test_task_graph_into_execution_graph(tmpdir): # Direct check execution_graph = DiGraph() core.translation.build_execution_graph(task_graph=test_task_graph, - execution_graph=execution_graph) + execution_graph=execution_graph, + executor=dry.MarkerExecutor()) execution_tasks = topological_sort(execution_graph) assert len(execution_tasks) == 7