Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-163-Update-node-state-for-stub-tasks 95255a790 -> 43da804c5
review2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/43da804c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/43da804c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/43da804c Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks Commit: 43da804c52fb0af2d1eb97d71059e3d08e676150 Parents: 95255a7 Author: max-orlov <[email protected]> Authored: Wed May 3 16:57:19 2017 +0300 Committer: max-orlov <[email protected]> Committed: Wed May 3 16:57:19 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/api/task.py | 19 +++++++----- aria/orchestrator/workflows/core/engine.py | 4 +-- aria/orchestrator/workflows/core/task.py | 31 ++++++++++---------- aria/orchestrator/workflows/core/translation.py | 8 ++--- aria/orchestrator/workflows/events_logging.py | 20 ++++++------- aria/orchestrator/workflows/executor/base.py | 5 ++-- aria/orchestrator/workflows/executor/dry.py | 14 +++++++-- tests/end2end/test_hello_world.py | 3 +- .../test_task_graph_into_execution_graph.py | 2 +- 9 files changed, 55 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index cb1618c..d376dd2 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -92,14 +92,17 @@ class OperationTask(BaseTask): name=actor.name, interface=self.interface_name, operation=self.operation_name) - self.is_stub = self.is_empty(self.actor, self.interface_name, self.operation_name) - if self.is_stub: - return - - operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] - self.plugin = operation.plugin - self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs) - self.implementation = operation.implementation + self.is_empty = self.is_empty(self.actor, self.interface_name, self.operation_name) + + if self.is_empty: + self.plugin = None + self.inputs = {} + self.implementation = None + else: + operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] + self.plugin = operation.plugin + self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs) + self.implementation = operation.implementation def __repr__(self): return self.name http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index a4498aa..6a66eb1 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -45,7 +45,7 @@ class Engine(logger.LoggerMixin): self._executor = executor translation.build_execution_graph(task_graph=tasks_graph, execution_graph=self._execution_graph, - executor=self._executor) + default_executor=self._executor) def execute(self): """ @@ -112,8 +112,6 @@ class Engine(logger.LoggerMixin): @staticmethod def _handle_executable_task(task): - if not isinstance(task, engine_task.StubTask): - events.sent_task_signal.send(task) task.execute() def _handle_ended_tasks(self, task): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 05e3365..2a2f010 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -25,10 +25,11 @@ from functools import ( ) +from ... import events from ....modeling import models from ...context import operation as operation_context -from .. import exceptions from ..executor import base +from .. import exceptions def _locked(func=None): @@ -54,6 +55,7 @@ class BaseTask(object): self._executor = executor def execute(self): + events.sent_task_signal.send(self) self._executor.execute(self) @property @@ -69,22 +71,20 @@ class StubTask(BaseTask): Base stub task for marker user tasks that only mark the start/end of a workflow or sub-workflow """ + STARTED = models.Task.STARTED + SUCCESS = models.Task.SUCCESS + def __init__(self, *args, **kwargs): - super(StubTask, self).__init__(executor=base.StubExecutor(), *args, **kwargs) + super(StubTask, self).__init__(executor=base.StubTaskExecutor(), *args, **kwargs) self.status = models.Task.PENDING self.due_at = datetime.utcnow() def has_ended(self): - return self.status in (models.Task.SUCCESS, models.Task.FAILED) - - def is_waiting(self): - return self.status in (models.Task.PENDING, models.Task.RETRYING) - - def end(self): - self.status = models.Task.SUCCESS + return self.status == self.SUCCESS - def start(self): - self.status = models.Task.STARTED + @staticmethod + def is_waiting(): + return False class StartWorkflowTask(StubTask): @@ -120,7 +120,7 @@ class OperationTask(BaseTask): Operation task """ def __init__(self, api_task, executor=None, *args, **kwargs): - # If no executor is provided, we defer that this is a stub task which does not need to be + # If no executor is provided, we infer that this is an empty task which does not need to be # executed. super(OperationTask, self).__init__( id=api_task.id, executor=executor or base.EmptyOperationExecutor(), *args, **kwargs) @@ -130,7 +130,6 @@ class OperationTask(BaseTask): self.operation_name = api_task.operation_name model_storage = api_task._workflow_context.model - # This currently signal that this is a stub task base_task_model = model_storage.task.model_cls if isinstance(api_task.actor, models.Node): context_cls = operation_context.NodeOperationContext @@ -152,9 +151,9 @@ class OperationTask(BaseTask): execution=self._workflow_context.execution, # Only non-stub tasks have these fields - plugin=getattr(api_task, 'plugin', None), - implementation=getattr(api_task, 'implementation', None), - inputs=getattr(api_task, 'inputs', {}), + plugin=api_task.plugin, + implementation=api_task.implementation, + inputs=api_task.inputs ) self._workflow_context.model.task.put(task_model) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py index 487d44d..d764024 100644 --- a/aria/orchestrator/workflows/core/translation.py +++ b/aria/orchestrator/workflows/core/translation.py @@ -24,7 +24,7 @@ from . import task as core_task def build_execution_graph( task_graph, execution_graph, - executor, + default_executor, start_cls=core_task.StartWorkflowTask, end_cls=core_task.EndWorkflowTask, depends_on=()): @@ -49,10 +49,10 @@ def build_execution_graph( default=[start_task]) if isinstance(api_task, api.task.OperationTask): - if api_task.is_stub: + if api_task.is_empty: operation_task = core_task.OperationTask(api_task) else: - operation_task = core_task.OperationTask(api_task, executor=executor) + operation_task = core_task.OperationTask(api_task, executor=default_executor) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) elif isinstance(api_task, api.task.WorkflowTask): @@ -60,7 +60,7 @@ def build_execution_graph( build_execution_graph( task_graph=api_task, execution_graph=execution_graph, - executor=executor, + default_executor=default_executor, start_cls=core_task.StartSubWorkflowTask, end_cls=core_task.EndSubWorkflowTask, depends_on=operation_dependencies http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index fa993d0..69b5b7a 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -34,25 +34,23 @@ def _get_task_name(task): @events.start_task_signal.connect -def _start_task_handler(task, skip_logging=False, **kwargs): - if skip_logging: - return - task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...' - .format(name=_get_task_name(task), task=task)) +def _start_task_handler(task, **kwargs): + if task.is_empty: + task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} has no ' + 'implementation'.format(name=_get_task_name(task), task=task)) + else: + task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...' + .format(name=_get_task_name(task), task=task)) @events.on_success_task_signal.connect -def _success_task_handler(task, skip_logging=False, **kwargs): - if skip_logging: - return +def _success_task_handler(task, **kwargs): task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful' .format(name=_get_task_name(task), task=task)) @events.on_failure_task_signal.connect -def _failure_operation_handler(task, traceback, skip_logging=False, **kwargs): - if skip_logging: - return +def _failure_operation_handler(task, traceback, **kwargs): task.context.logger.error( '{name} {task.interface_name}.{task.operation_name} failed' .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 3a3a1fe..0194ee7 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -56,10 +56,9 @@ class BaseExecutor(logger.LoggerMixin): events.on_success_task_signal.send(task) -class StubExecutor(BaseExecutor): +class StubTaskExecutor(BaseExecutor): def execute(self, task): - task.start() - task.end() + task.status = task.SUCCESS class EmptyOperationExecutor(BaseExecutor): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index 7da48f3..eb70a41 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -16,8 +16,8 @@ """ Dry executor """ +from datetime import datetime -from aria.orchestrator import events from .base import BaseExecutor @@ -27,7 +27,11 @@ class DryExecutor(BaseExecutor): """ def execute(self, task): - events.start_task_signal.send(task, skip_logging=True) + # updating the task manually instead of calling self._task_started(task), + # to avoid any side effects raising that event might cause + with task._update(): + task.started_at = datetime.utcnow() + task.status = task.STARTED if hasattr(task.actor, 'source_node'): name = '{source_node.name}->{target_node.name}'.format( @@ -43,4 +47,8 @@ class DryExecutor(BaseExecutor): '<dry> {name} {task.interface_name}.{task.operation_name} successful' .format(name=name, task=task)) - events.on_success_task_signal.send(task, skip_logging=True) + # updating the task manually instead of calling self._task_succeeded(task), + # to avoid any side effects raising that event might cause + with task._update(): + task.ended_at = datetime.utcnow() + task.status = task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/tests/end2end/test_hello_world.py ---------------------------------------------------------------------- diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py index dc8a2a3..fc5f631 100644 --- a/tests/end2end/test_hello_world.py +++ b/tests/end2end/test_hello_world.py @@ -29,8 +29,7 @@ def test_hello_world(testenv): finally: # Even if some assertions failed, attempt to execute uninstall so the # webserver process doesn't stay up once the test is finished - # TODO: remove force_service_delete=True - testenv.uninstall_service(force_service_delete=True) + testenv.uninstall_service() _verify_webserver_down('http://localhost:9090') testenv.verify_clean_storage() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/43da804c/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 5dc1206..2a96d01 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -68,7 +68,7 @@ def test_task_graph_into_execution_graph(tmpdir): execution_graph = DiGraph() core.translation.build_execution_graph(task_graph=test_task_graph, execution_graph=execution_graph, - executor=base.StubExecutor()) + default_executor=base.StubTaskExecutor()) execution_tasks = topological_sort(execution_graph) assert len(execution_tasks) == 7
