Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-105-integrate-modeling 7d20a8488 -> 9876d94e0
Final fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9876d94e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9876d94e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9876d94e Branch: refs/heads/ARIA-105-integrate-modeling Commit: 9876d94e0b0b688dd757bd3586b15c8e01d73aed Parents: 7d20a84 Author: Tal Liron <[email protected]> Authored: Tue Mar 21 12:35:46 2017 -0500 Committer: Tal Liron <[email protected]> Committed: Tue Mar 21 12:35:46 2017 -0500 ---------------------------------------------------------------------- aria/orchestrator/workflows/api/task.py | 111 +++++++++++---------- aria/orchestrator/workflows/builtin/utils.py | 10 +- aria/orchestrator/workflows/exceptions.py | 10 +- tests/orchestrator/workflows/api/test_task.py | 33 ++++-- 4 files changed, 95 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9876d94e/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 98016ca..9522d7a 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -62,26 +62,23 @@ class OperationTask(BaseTask): def __init__(self, actor, - name=None, - actor_type=None, - interface_name=None, - operation_name=None, - implementation=None, + actor_type, + interface_name, + operation_name, + runs_on=None, max_attempts=None, retry_interval=None, ignore_failure=None, - inputs=None, - runs_on=None): + inputs=None): """ - Creates an operation task using the name, details, node instance and any additional kwargs. - - :param name: the name of the operation. - :param actor: the operation host on which this operation is registered. - :param inputs: operation inputs. + Do not call this constructor directly. Instead, use :meth:`for_node` or + :meth:`for_relationship`. """ assert isinstance(actor, (models.Node, models.Relationship)) - assert (runs_on is None) or (runs_on in models.Task.RUNS_ON) + assert actor_type in ('node', 'relationship') + assert interface_name and operation_name + assert runs_on in models.Task.RUNS_ON super(OperationTask, self).__init__() self.actor = actor @@ -99,37 +96,31 @@ class OperationTask(BaseTask): if not isinstance(v, models.Parameter): inputs[k] = models.Parameter.wrap(k, v) - # TODO: Suggestion: these extra inputs should likely be stored as a separate entry in the - # task model, because they are different from the operation inputs. The two kinds of inputs - # should also not be merged. - - if interface_name or operation_name: - operation = self._get_operation(interface_name, operation_name) - if operation is None: - raise exceptions.TaskCreationException( - 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' + # TODO: Suggestion: these extra inputs could be stored as a separate entry in the task + # model, because they are different from the operation inputs. If we do this, then the two + # kinds of inputs should *not* be merged here. + + operation = self._get_operation(interface_name, operation_name) + if operation is None: + raise exceptions.OperationNotFoundException( + 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' + .format(operation_name, interface_name, actor_type, actor.name)) + + self.plugin = None + if operation.plugin_specification: + self.plugin = OperationTask._find_plugin(operation.plugin_specification) + if self.plugin is None: + raise exceptions.PluginNotFoundException( + 'Could not find plugin of operation "{0}" on interface "{1}" for {2} "{3}"' .format(operation_name, interface_name, actor_type, actor.name)) - self.plugin = None - if operation.plugin_specification: - self.plugin = OperationTask._find_plugin(operation.plugin_specification) - if self.plugin is None: - raise exceptions.TaskCreationException( - 'Could not find plugin of operation "{0}" on interface "{1}" for {2} "{3}"' - .format(operation_name, interface_name, actor_type, actor.name)) - - self.implementation = operation.implementation - self.inputs = OperationTask._merge_inputs(operation.inputs, inputs) - - self.name = OperationTask.NAME_FORMAT.format(type=actor_type, - name=actor.name, - interface=interface_name, - operation=operation_name) - else: - self.name = name - self.implementation = implementation - self.inputs = inputs or {} - self.plugin = None + self.implementation = operation.implementation + self.inputs = OperationTask._merge_inputs(operation.inputs, inputs) + + self.name = OperationTask.NAME_FORMAT.format(type=actor_type, + name=actor.name, + interface=interface_name, + operation=operation_name) @classmethod def for_node(cls, @@ -143,10 +134,16 @@ class OperationTask(BaseTask): """ Creates an operation on a node. - :param node: the node of which this operation belongs to. - :param interface_name: the name of the interface. - :param operation_name: the name of the operation. - :param inputs: any additional inputs to the operation + :param node: The node on which to run the operation + :param interface_name: The interface name + :param operation_name: The operation name within the interface + :param max_attempts: The maximum number of attempts in case the operation fails + (if not specified the defaults it taken from the workflow context) + :param retry_interval: The interval in seconds between attempts when the operation fails + (if not specified the defaults it taken from the workflow context) + :param ignore_failure: Whether to ignore failures + (if not specified the defaults it taken from the workflow context) + :param inputs: Additional operation inputs """ assert isinstance(node, models.Node) @@ -166,19 +163,25 @@ class OperationTask(BaseTask): relationship, interface_name, operation_name, + runs_on=models.Task.RUNS_ON_SOURCE, max_attempts=None, retry_interval=None, ignore_failure=None, - inputs=None, - runs_on=models.Task.RUNS_ON_SOURCE): + inputs=None): """ Creates an operation on a relationship edge. - :param relationship: the relationship of which this operation belongs to. - :param interface_name: the name of the interface. - :param operation_name: the name of the operation. - :param inputs: any additional inputs to the operation + :param relationship: The relationship on which to run the operation + :param interface_name: The interface name + :param operation_name: The operation name within the interface :param runs_on: where to run the operation ("source" or "target"); defaults to "source" + :param max_attempts: The maximum number of attempts in case the operation fails + (if not specified the defaults it taken from the workflow context) + :param retry_interval: The interval in seconds between attempts when the operation fails + (if not specified the defaults it taken from the workflow context) + :param ignore_failure: Whether to ignore failures + (if not specified the defaults it taken from the workflow context) + :param inputs: Additional operation inputs """ assert isinstance(relationship, models.Relationship) @@ -188,11 +191,11 @@ class OperationTask(BaseTask): actor_type='relationship', interface_name=interface_name, operation_name=operation_name, + runs_on=runs_on, max_attempts=max_attempts, retry_interval=retry_interval, ignore_failure=ignore_failure, - inputs=inputs, - runs_on=runs_on) + inputs=inputs) def _get_operation(self, interface_name, operation_name): interface = self.actor.interfaces.get(interface_name) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9876d94e/aria/orchestrator/workflows/builtin/utils.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py index 6fc1ffe..8efa889 100644 --- a/aria/orchestrator/workflows/builtin/utils.py +++ b/aria/orchestrator/workflows/builtin/utils.py @@ -26,7 +26,8 @@ def create_node_task(interface_name, operation_name, node): return OperationTask.for_node(node=node, interface_name=interface_name, operation_name=operation_name) - except exceptions.TaskCreationException: + except exceptions.OperationNotFoundException: + # We will skip nodes which do not have the operation return None @@ -44,7 +45,8 @@ def create_relationship_tasks(interface_name, operation_name, runs_on, node): interface_name=interface_name, operation_name=operation_name, runs_on=runs_on)) - except exceptions.TaskCreationException: + except exceptions.OperationNotFoundException: + # We will skip relationships which do not have the operation pass return sequence @@ -54,9 +56,9 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): Creates dependencies between tasks if there is a relationship (outbound) between their nodes. """ - def get_task(node_id): + def get_task(node_name): for task, node in tasks_and_nodes: - if node.name == node_id: + if node.name == node_name: return task return None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9876d94e/aria/orchestrator/workflows/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/exceptions.py b/aria/orchestrator/workflows/exceptions.py index f624fec..4fb8dd7 100644 --- a/aria/orchestrator/workflows/exceptions.py +++ b/aria/orchestrator/workflows/exceptions.py @@ -70,7 +70,13 @@ class TaskException(exceptions.AriaError): """ -class TaskCreationException(TaskException): +class OperationNotFoundException(TaskException): """ - Raised by the task + Could not find an operation on the node or relationship. + """ + + +class PluginNotFoundException(TaskException): + """ + Could not find a plugin matching the plugin specification. """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9876d94e/tests/orchestrator/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py index e749b78..b635a88 100644 --- a/tests/orchestrator/workflows/api/test_task.py +++ b/tests/orchestrator/workflows/api/test_task.py @@ -55,7 +55,7 @@ class TestOperationTask(object): implementation='op_path')) node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) - node.interfaces = {interface.name: interface} + node.interfaces[interface_name] = interface node.plugin_specifications[plugin_specification.name] = plugin_specification ctx.model.node.update(node) inputs = {'test_input': True} @@ -186,21 +186,36 @@ class TestOperationTask(object): assert api_task.runs_on == models.Task.RUNS_ON_TARGET def test_operation_task_default_values(self, ctx): - dependency_node = ctx.model.node.get_by_name( - mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'test_interface' + operation_name = 'create' + + plugin = mock.models.create_plugin('package', '0.1') + ctx.model.node.update(plugin) + + plugin_specification = mock.models.create_plugin_specification('package', '0.1') + + dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + interface = mock.models.create_interface( + ctx.service, + interface_name, + operation_name, + operation_kwargs=dict(plugin_specification=plugin_specification, + implementation='op_path')) + dependency_node.interfaces[interface_name] = interface with context.workflow.current.push(ctx): - task = api.task.OperationTask( - name='stub', - implementation='', - actor=dependency_node) + task = api.task.OperationTask.for_node( + node=dependency_node, + interface_name=interface_name, + operation_name=operation_name) assert task.inputs == {} assert task.retry_interval == ctx._task_retry_interval assert task.max_attempts == ctx._task_max_attempts assert task.ignore_failure == ctx._task_ignore_failure - assert task.plugin is None - assert task.runs_on is None + assert task.plugin is plugin + assert task.runs_on == models.Task.RUNS_ON_NODE class TestWorkflowTask(object):
