Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-92-plugin-in-implementation-string 669356657 -> dabfc307c
Cleanups Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/dabfc307 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/dabfc307 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/dabfc307 Branch: refs/heads/ARIA-92-plugin-in-implementation-string Commit: dabfc307c0d4aa7dbbf56ca0c979b02777b71c78 Parents: 6693566 Author: Tal Liron <tal.li...@gmail.com> Authored: Tue Apr 4 17:56:52 2017 -0500 Committer: Tal Liron <tal.li...@gmail.com> Committed: Tue Apr 4 17:56:52 2017 -0500 ---------------------------------------------------------------------- aria/cli/dry.py | 4 +- aria/modeling/orchestration.py | 54 +++++- aria/modeling/service_instance.py | 80 +++++--- aria/modeling/service_template.py | 14 +- aria/orchestrator/execution_plugin/__init__.py | 156 --------------- .../execution_plugin/instantiation.py | 188 +++++++++++++++++++ .../execution_plugin/ssh/operations.py | 2 + aria/orchestrator/workflows/api/task.py | 61 ++---- aria/parser/consumption/modeling.py | 12 +- .../custom_types/elasticsearch.yaml | 2 + .../multi-tier-1/custom_types/kibana.yaml | 2 + .../multi-tier-1/custom_types/logstash.yaml | 2 + .../paypalpizzastore_nodejs_app.yaml | 2 +- .../webserver-dbms-2/webserver-dbms-2.yaml | 6 +- .../simple_v1_0/modeling/capabilities.py | 5 + tests/modeling/test_models.py | 65 ++++--- tests/orchestrator/context/test_operation.py | 3 +- tests/orchestrator/context/test_serialize.py | 3 +- tests/orchestrator/execution_plugin/test_ssh.py | 17 +- tests/orchestrator/workflows/api/test_task.py | 24 +-- .../workflows/builtin/test_execute_operation.py | 3 +- tests/orchestrator/workflows/core/test_task.py | 29 ++- .../test_task_graph_into_exececution_graph.py | 3 +- .../node-cellar/node-cellar.yaml | 2 +- 24 files changed, 413 insertions(+), 326 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/cli/dry.py ---------------------------------------------------------------------- diff --git a/aria/cli/dry.py b/aria/cli/dry.py index 82faf42..9c38a85 100644 --- a/aria/cli/dry.py +++ b/aria/cli/dry.py @@ -49,8 +49,8 @@ def convert_operation_to_dry(oper): Converts a single :class:`Operation` to run dryly. """ - plugin = oper.plugin_specification.name \ - if oper.plugin_specification is not None else None + plugin = oper.plugin.name \ + if oper.plugin is not None else None if oper.inputs is None: oper.inputs = OrderedDict() oper.inputs['_implementation'] = models.Parameter(name='_implementation', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index b5c735d..d29f9f8 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -200,7 +200,44 @@ class PluginBase(ModelMixin): class TaskBase(ModelMixin): """ - A Model which represents an task + Represents the smallest unit of stateful execution in ARIA. The task state includes inputs, + outputs, as well as an atomic status, ensuring that the task can only be running once at any + given time. + + Tasks may be "one shot" or may be configured to run repeatedly in the case of failure. + + Tasks are often based on :class:`Operation`, and thus act on either a :class:`Node` or a + :class:`Relationship`, however this is not required. + + :ivar node: The node actor (optional) + :vartype node: :class:`Node` + :ivar relationship: The relationship actor (optional) + :vartype relationship: :class:`Relationship` + :ivar plugin: The implementing plugin (set to None for default execution plugin) + :vartype plugin: :class:`Plugin` + :ivar inputs: Parameters that can be used by this task + :vartype inputs: {basestring: :class:`Parameter`} + :ivar implementation: Python path to an ``@operation`` function + :vartype implementation: basestring + :ivar max_attempts: Maximum number of retries allowed in case of failure + :vartype max_attempts: int + :ivar retry_interval: Interval between retries (in seconds) + :vartype retry_interval: int + :ivar ignore_failure: Set to True to ignore failures + :vartype ignore_failure: bool + :ivar due_at: Timestamp to start the task + :vartype due_at: datetime + :ivar execution: Assigned execution + :vartype execution: :class:`Execution` + :ivar status: Current atomic status ('pending', 'retrying', 'sent', 'started', 'success', + 'failed') + :vartype status: basestring + :ivar started_at: Timestamp for when task started + :vartype started_at: datetime + :ivar ended_at: Timestamp for when task ended + :vartype ended_at: datetime + :ivar retry_count: How many retries occurred + :vartype retry_count: int """ __tablename__ = 'task' @@ -253,18 +290,17 @@ class TaskBase(ModelMixin): def inputs(cls): return relationship.many_to_many(cls, 'parameter', prefix='inputs', dict_key='name') - status = Column(Enum(*STATES, name='status'), default=PENDING) - - due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow()) - started_at = Column(DateTime, default=None) - ended_at = Column(DateTime, default=None) + implementation = Column(String) max_attempts = Column(Integer, default=1) - retry_count = Column(Integer, default=0) retry_interval = Column(Float, default=0) ignore_failure = Column(Boolean, default=False) + due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow()) - implementation = Column(String) - configuration = Column(modeling_types.StrictDict(key_cls=basestring)) + # State + status = Column(Enum(*STATES, name='status'), default=PENDING) + started_at = Column(DateTime, default=None) + ended_at = Column(DateTime, default=None) + retry_count = Column(Integer, default=0) @property def actor(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/modeling/service_instance.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_instance.py b/aria/modeling/service_instance.py index ecfae2c..6130442 100644 --- a/aria/modeling/service_instance.py +++ b/aria/modeling/service_instance.py @@ -28,6 +28,7 @@ from .mixins import InstanceModelMixin from ..parser import validation from ..parser.consumption import ConsumptionContext from ..utils import collections, formatting, console +from ..orchestrator.execution_plugin.instantiation import configure_operation from . import ( relationship, utils, @@ -184,6 +185,12 @@ class ServiceBase(InstanceModelMixin): # pylint: disable=too-many-public-methods for node in self.nodes.itervalues(): node.find_host() + def configure_operations(self): + for node in self.nodes.itervalues(): + node.configure_operations() + for operation in self.workflows.itervalues(): + operation.configure() + def is_node_a_target(self, target_node): for node in self.nodes.itervalues(): if self._is_node_a_target(node, target_node): @@ -377,7 +384,7 @@ class NodeBase(InstanceModelMixin): # pylint: disable=too-many-public-methods child_property='target_node') @declared_attr - def host(cls): + def host(cls): # pylint: disable=method-hidden return relationship.one_to_one_self(cls, 'host_fk') # region orchestration @@ -390,9 +397,10 @@ class NodeBase(InstanceModelMixin): # pylint: disable=too-many-public-methods __mapper_args__ = {'version_id_col': version} # Enable SQLAlchemy automatic version counting @property - def ip(self): - if self.host and self.host.runtime_properties: - return self.host.runtime_properties.get('ip') + def host_address(self): + if self.host: + if self.host.runtime_properties: + return self.host.runtime_properties.get('host_address') return None # endregion @@ -520,22 +528,28 @@ class NodeBase(InstanceModelMixin): # pylint: disable=too-many-public-methods def _find_host(node): if node.type.role == 'host': return node - for relationship in node.outbound_relationships: - if (relationship.target_capability is not None) and \ - relationship.target_capability.type.role == 'host': - host = _find_host(relationship.target_node) + for the_relationship in node.outbound_relationships: + if (the_relationship.target_capability is not None) and \ + the_relationship.target_capability.type.role == 'host': + host = _find_host(the_relationship.target_node) if host is not None: return host - for relationship in node.inbound_relationships: - if (relationship.target_capability is not None) and \ - relationship.target_capability.type.role == 'feature': - host = _find_host(relationship.source_node) + for the_relationship in node.inbound_relationships: + if (the_relationship.target_capability is not None) and \ + the_relationship.target_capability.type.role == 'feature': + host = _find_host(the_relationship.source_node) if host is not None: return host return None self.host = _find_host(self) + def configure_operations(self): + for interface in self.interfaces.itervalues(): + interface.configure_operations() + for the_relationship in self.outbound_relationships: + the_relationship.configure_operations() + @property def as_raw(self): return collections.OrderedDict(( @@ -584,6 +598,7 @@ class NodeBase(InstanceModelMixin): # pylint: disable=too-many-public-methods utils.dump_dict_values(self.capabilities, 'Capabilities') utils.dump_list_values(self.outbound_relationships, 'Relationships') + class GroupBase(InstanceModelMixin): """ Usually an instance of a :class:`GroupTemplate`. @@ -1069,6 +1084,10 @@ class RelationshipBase(InstanceModelMixin): # endregion + def configure_operations(self): + for interface in self.interfaces.itervalues(): + interface.configure_operations() + @property def as_raw(self): return collections.OrderedDict(( @@ -1301,6 +1320,10 @@ class InterfaceBase(InstanceModelMixin): # endregion + def configure_operations(self): + for operation in self.operations.itervalues(): + operation.configure() + @property def as_raw(self): return collections.OrderedDict(( @@ -1341,8 +1364,8 @@ class OperationBase(InstanceModelMixin): :vartype operation_template: :class:`OperationTemplate` :ivar description: Human-readable description :vartype description: string - :ivar plugin_specification: Associated plugin - :vartype plugin_specification: :class:`PluginSpecification` + :ivar plugin: Associated plugin + :vartype plugin: :class:`Plugin` :ivar implementation: Implementation (interpreted by the plugin) :vartype implementation: basestring :ivar configuration: Configuration (interpreted by the plugin) @@ -1378,8 +1401,8 @@ class OperationBase(InstanceModelMixin): description = Column(Text) @declared_attr - def plugin_specification(cls): - return relationship.one_to_one(cls, 'plugin_specification') + def plugin(cls): + return relationship.one_to_one(cls, 'plugin') runs_on = Column(Text) implementation = Column(Text) @@ -1407,9 +1430,9 @@ class OperationBase(InstanceModelMixin): return relationship.foreign_key('interface', nullable=True) @declared_attr - def plugin_specification_fk(cls): - """For Operation one-to-one to PluginSpecification""" - return relationship.foreign_key('plugin_specification', nullable=True) + def plugin_fk(cls): + """For Operation one-to-one to Plugin""" + return relationship.foreign_key('plugin', nullable=True) @declared_attr def operation_template_fk(cls): @@ -1418,6 +1441,19 @@ class OperationBase(InstanceModelMixin): # endregion + def configure(self): + if self.implementation is None: + return + if self.interface is not None: + # Operation on an interface + if self.plugin is None: + configure_operation(self) + else: + pass + elif self.service is not None: + # Workflow + pass + @property def as_raw(self): return collections.OrderedDict(( @@ -1443,9 +1479,9 @@ class OperationBase(InstanceModelMixin): if self.description: console.puts(context.style.meta(self.description)) with context.style.indent: - if self.plugin_specification is not None: - console.puts('Plugin specification: {0}'.format( - context.style.literal(self.plugin_specification.name))) + if self.plugin is not None: + console.puts('Plugin: {0}'.format( + context.style.literal(self.plugin.name))) if self.implementation is not None: console.puts('Implementation: {0}'.format( context.style.literal(self.implementation))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/modeling/service_template.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py index 2eb88c9..3e4554c 100644 --- a/aria/modeling/service_template.py +++ b/aria/modeling/service_template.py @@ -1547,9 +1547,21 @@ class OperationTemplateBase(TemplateModelMixin): def instantiate(self, container): from . import models + from ..orchestrator import context + plugin = None + if self.plugin_specification is not None: + try: + workflow_context = context.workflow.current.get() + plugin = self.plugin_specification.find_plugin(workflow_context.model.plugin.list()) + except context.exceptions.ContextException: + pass + # TODO + #context = ConsumptionContext.get_thread_local() + #context.validation.report('plugin not found for specification: {0}'.format( + # self.plugin_specification.name), level=validation.Issue.EXTERNAL) operation = models.Operation(name=self.name, description=deepcopy_with_locators(self.description), - plugin_specification=self.plugin_specification, + plugin=plugin, implementation=self.implementation, configuration=self.configuration, dependencies=self.dependencies, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/orchestrator/execution_plugin/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/__init__.py b/aria/orchestrator/execution_plugin/__init__.py index 5a79cbe..7bd5f02 100644 --- a/aria/orchestrator/execution_plugin/__init__.py +++ b/aria/orchestrator/execution_plugin/__init__.py @@ -14,8 +14,6 @@ # limitations under the License. from contextlib import contextmanager -from ...modeling import models -from ...utils.formatting import full_type_name # Populated during execution of python scripts @@ -34,157 +32,3 @@ def python_script_scope(operation_ctx, operation_inputs): finally: ctx = None inputs = None - - -def configure_operation(operation_task): - from . import operations - - inputs = {} - inputs['script_path'] = operation_task.implementation - inputs['process'] = _get_process(operation_task.configuration.get('process')) - - host = None - if operation_task.actor_type == 'node': - host = operation_task.actor.host - elif operation_task.actor_type == 'relationship': - edge = operation_task.configuration.get('edge', 'source') - if edge == 'source': - host = operation_task.actor.source_node.host - elif edge == 'target': - host = operation_task.actor.target_node.host - else: - raise ValueError('"edge" configuration must be "source" or "target" for "{0}": {1}' - .format(operation_task.implementation, edge)) - ip = host.ip if host is not None else None - #ip = '1.2.3.4' - - if ip is None: - _configure_local(operation_task) - else: - _configure_remote(operation_task, inputs, ip) - - for k, v in inputs.iteritems(): - operation_task.inputs[k] = models.Parameter.wrap(k, v) - - -def _configure_local(operation_task): - """ - Local operation. - """ - operation_task.implementation = '{0}.{1}'.format(operations.__name__, - operations.run_script_locally.__name__) - - -def _configure_remote(operation_task, inputs, ip): - """ - Remote SSH operation via Fabric. - """ - default_user = 'admin' - default_password = 'admin' - ssh = _get_ssh(operation_task.configuration.get('ssh')) - if 'user' not in ssh: - ssh['user'] = default_user - if ('password' not in ssh) and ('key' not in ssh) and ('key_filename' not in ssh): - ssh['password'] = default_password - inputs['use_sudo'] = ssh.get('use_sudo') - inputs['hide_output'] = ssh.get('hide_output') - inputs['fabric_env'] = {} - inputs['fabric_env']['host_string'] = ip - if 'warn_only' in ssh: - inputs['fabric_env']['warn_only'] = ssh['warn_only'] - inputs['fabric_env']['user'] = ssh.get('user') - inputs['fabric_env']['password'] = ssh.get('password') - inputs['fabric_env']['key'] = ssh.get('key') - inputs['fabric_env']['key_filename'] = ssh.get('key_filename') - - if inputs['fabric_env'].get('user') is None: - raise ValueError('must configure "ssh.user" for "{0}"' - .format(operation_task.implementation)) - if (inputs['fabric_env'].get('password') is None) and \ - (inputs['fabric_env'].get('key') is None) and \ - (inputs['fabric_env'].get('key_filename') is None): - raise ValueError( - 'must configure "ssh.password", "ssh.key", or "ssh.key_filename" for "{0}"' - .format(operation_task.implementation)) - - operation_task.implementation = '{0}.{1}'.format(operations.__name__, - operations.run_script_with_ssh.__name__) - - -def _get_process(value): - from ..workflows.exceptions import TaskCreationException - if value is None: - return None - _validate_type(value, dict, 'process') - for k, v in value.iteritems(): - if k == 'eval_python': - value[k] = _str_to_bool(v, 'process.eval_python') - elif k == 'cwd': - _validate_type(v, basestring, 'process.cwd') - elif k == 'command_prefix': - _validate_type(v, basestring, 'process.command_prefix') - elif k == 'args': - value[k] = _dict_to_list(v, 'process.args') - elif k == 'env': - _validate_type(v, dict, 'process.env') - else: - raise TaskCreationException('unsupported configuration: "process.{0}"'.format(k)) - return value - - -def _get_ssh(value): - from ..workflows.exceptions import TaskCreationException - if value is None: - return {} - _validate_type(value, dict, 'ssh') - for k, v in value.iteritems(): - if k == 'use_sudo': - value[k] = _str_to_bool(v, 'ssh.use_sudo') - elif k == 'hide_output': - value[k] = _dict_to_list(v, 'ssh.hide_output') - elif k == 'warn_only': - value[k] = _str_to_bool(v, 'ssh.warn_only') - elif k == 'user': - _validate_type(v, basestring, 'ssh.user') - elif k == 'password': - _validate_type(v, basestring, 'ssh.password') - elif k == 'key': - _validate_type(v, basestring, 'ssh.key') - elif k == 'key_filename': - _validate_type(v, basestring, 'ssh.key_filename') - else: - raise TaskCreationException('unsupported configuration: "ssh.{0}"'.format(k)) - return value - - -def _validate_type(value, the_type, name): - from ..workflows.exceptions import TaskCreationException - if not isinstance(value, the_type): - raise TaskCreationException('"{0}" configuration is not a {1}' - .format(name, full_type_name(the_type))) - -def _str_to_bool(value, name): - from ..workflows.exceptions import TaskCreationException - if value is None: - return None - _validate_type(value, basestring, name) - if value == 'true': - return True - elif value == 'false': - return False - else: - raise TaskCreationException('"{0}" configuration is not "true" or "false": {1}' - .format(name, repr(value))) - - -def _dict_to_list(the_dict, name): - from ..workflows.exceptions import TaskCreationException - _validate_type(the_dict, dict, name) - value = [] - for k in sorted(the_dict): - v = the_dict[k] - if not isinstance(v, basestring): - raise TaskCreationException('"{0}.{1}" configuration is not a string: {2}' - .format(name, k, repr(v))) - value.append(v) - return value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/orchestrator/execution_plugin/instantiation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/instantiation.py b/aria/orchestrator/execution_plugin/instantiation.py new file mode 100644 index 0000000..99d7e6e --- /dev/null +++ b/aria/orchestrator/execution_plugin/instantiation.py @@ -0,0 +1,188 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ...utils.formatting import full_type_name +from ...utils.collections import OrderedDict +from ...parser import validation +from ...parser.consumption import ConsumptionContext + + +def configure_operation(operation): + configuration = operation.configuration or {} + inputs = OrderedDict() + inputs['script_path'] = operation.implementation + inputs['process'] = _get_process(configuration.get('process')) + + host = None + interface = operation.interface + if interface.node is not None: + host = interface.node.host + elif interface.relationship is not None: + edge = configuration.get('edge', 'source') + if edge == 'source': + host = interface.relationship.source_node.host + elif edge == 'target': + host = interface.relationship.target_node.host + else: + context = ConsumptionContext.get_thread_local() + context.validation.report('"edge" configuration must be "source" or "target" for "{0}":' + ' {1}'.format(operation.implementation, edge), + level=validation.Issue.BETWEEN_TYPES) + # TODO: host address would not be available + host_address = host.host_address if host is not None else None + + if host_address is None: + _configure_local(operation) + else: + _configure_remote(operation, inputs, host_address) + + from ...modeling import models + for k, v in inputs.iteritems(): + operation.inputs[k] = models.Parameter.wrap(k, v) + + +def _configure_local(operation): + """ + Local operation. + """ + from . import operations + operation.implementation = '{0}.{1}'.format(operations.__name__, + operations.run_script_locally.__name__) + + +def _configure_remote(operation, inputs, host_address): + """ + Remote SSH operation via Fabric. + """ + default_user = 'admin' + default_password = 'admin' + configuration = operation.configuration or {} + ssh = _get_ssh(configuration.get('ssh')) + if 'user' not in ssh: + ssh['user'] = default_user + if ('password' not in ssh) and ('key' not in ssh) and ('key_filename' not in ssh): + ssh['password'] = default_password + inputs['use_sudo'] = ssh.get('use_sudo') + inputs['hide_output'] = ssh.get('hide_output') + inputs['fabric_env'] = {} + inputs['fabric_env']['host_string'] = host_address + if 'warn_only' in ssh: + inputs['fabric_env']['warn_only'] = ssh['warn_only'] + inputs['fabric_env']['user'] = ssh.get('user') + inputs['fabric_env']['password'] = ssh.get('password') + inputs['fabric_env']['key'] = ssh.get('key') + inputs['fabric_env']['key_filename'] = ssh.get('key_filename') + + if inputs['fabric_env'].get('user') is None: + context = ConsumptionContext.get_thread_local() + context.validation.report('must configure "ssh.user" for "{0}"' + .format(operation.implementation), + level=validation.Issue.BETWEEN_TYPES) + if (inputs['fabric_env'].get('password') is None) and \ + (inputs['fabric_env'].get('key') is None) and \ + (inputs['fabric_env'].get('key_filename') is None): + context = ConsumptionContext.get_thread_local() + context.validation.report('must configure "ssh.password", "ssh.key", or "ssh.key_filename" ' + 'for "{0}"' + .format(operation.implementation), + level=validation.Issue.BETWEEN_TYPES) + + from . import operations + operation.implementation = '{0}.{1}'.format(operations.__name__, + operations.run_script_with_ssh.__name__) + + +def _get_process(value): + if value is None: + return None + _validate_type(value, dict, 'process') + for k, v in value.iteritems(): + if k == 'eval_python': + value[k] = _str_to_bool(v, 'process.eval_python') + elif k == 'cwd': + _validate_type(v, basestring, 'process.cwd') + elif k == 'command_prefix': + _validate_type(v, basestring, 'process.command_prefix') + elif k == 'args': + value[k] = _dict_to_list(v, 'process.args') + elif k == 'env': + _validate_type(v, dict, 'process.env') + else: + context = ConsumptionContext.get_thread_local() + context.validation.report('unsupported configuration: "process.{0}"'.format(k), + level=validation.Issue.BETWEEN_TYPES) + return value + + +def _get_ssh(value): + if value is None: + return {} + _validate_type(value, dict, 'ssh') + for k, v in value.iteritems(): + if k == 'use_sudo': + value[k] = _str_to_bool(v, 'ssh.use_sudo') + elif k == 'hide_output': + value[k] = _dict_to_list(v, 'ssh.hide_output') + elif k == 'warn_only': + value[k] = _str_to_bool(v, 'ssh.warn_only') + elif k == 'user': + _validate_type(v, basestring, 'ssh.user') + elif k == 'password': + _validate_type(v, basestring, 'ssh.password') + elif k == 'key': + _validate_type(v, basestring, 'ssh.key') + elif k == 'key_filename': + _validate_type(v, basestring, 'ssh.key_filename') + else: + context = ConsumptionContext.get_thread_local() + context.validation.report('unsupported configuration: "ssh.{0}"'.format(k), + level=validation.Issue.BETWEEN_TYPES) + return value + + +def _validate_type(value, the_type, name): + if not isinstance(value, the_type): + context = ConsumptionContext.get_thread_local() + context.validation.report('"{0}" configuration is not a {1}' + .format(name, full_type_name(the_type)), + level=validation.Issue.BETWEEN_TYPES) + +def _str_to_bool(value, name): + if value is None: + return None + _validate_type(value, basestring, name) + if value == 'true': + return True + elif value == 'false': + return False + else: + context = ConsumptionContext.get_thread_local() + context.validation.report('"{0}" configuration is not "true" or "false": {1}' + .format(name, repr(value)), + level=validation.Issue.BETWEEN_TYPES) + + +def _dict_to_list(the_dict, name): + _validate_type(the_dict, dict, name) + value = [] + for k in sorted(the_dict): + v = the_dict[k] + if not isinstance(v, basestring): + context = ConsumptionContext.get_thread_local() + context.validation.report('"{0}.{1}" configuration is not a string: {2}' + .format(name, k, repr(v)), + level=validation.Issue.BETWEEN_TYPES) + value.append(v) + return value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/orchestrator/execution_plugin/ssh/operations.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py b/aria/orchestrator/execution_plugin/ssh/operations.py index d760ba8..7147a30 100644 --- a/aria/orchestrator/execution_plugin/ssh/operations.py +++ b/aria/orchestrator/execution_plugin/ssh/operations.py @@ -144,6 +144,8 @@ def _fabric_env(ctx, fabric_env, warn_only): env.update(fabric_env or {}) env.setdefault('warn_only', warn_only) # validations + if (not env.get('host_string')) and (ctx.task) and (ctx.task.actor) and (ctx.task.actor.host): + env['host_string'] = ctx.task.actor.host.host_address if not env.get('host_string'): ctx.task.abort('`host_string` not supplied and ip cannot be deduced automatically') if not (env.get('password') or env.get('key_filename') or env.get('key')): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 150863d..ab9f550 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -19,9 +19,8 @@ Provides the tasks to be entered into the task graph import copy from ....modeling import models -from ....utils.collections import (OrderedDict, merge) +from ....utils.collections import (OrderedDict, FrozenDict) from ....utils.uuid import generate_uuid -from ... import execution_plugin from ... import context from .. import exceptions @@ -66,9 +65,8 @@ class OperationTask(BaseTask): actor, actor_type, interface_name, - operation_name, + operation_name, # remove configuration inputs=None, - configuration=None, max_attempts=None, retry_interval=None, ignore_failure=None): @@ -83,7 +81,10 @@ class OperationTask(BaseTask): self.actor = actor self.actor_type = actor_type - operation = self._get_operation(interface_name, operation_name) + operation = None + interface = self.actor.interfaces.get(interface_name) + if interface is not None: + operation = interface.operations.get(operation_name) if operation is None: raise exceptions.OperationNotFoundException( 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' @@ -105,12 +106,7 @@ class OperationTask(BaseTask): self.ignore_failure = (self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure) self.implementation = operation.implementation - self.configuration = {} - - if operation.configuration: - merge(self.configuration, operation.configuration) - if configuration: - merge(self.configuration, operation.configuration) + self.plugin = operation.plugin # Wrap inputs inputs = copy.deepcopy(inputs) if inputs else {} @@ -118,21 +114,10 @@ class OperationTask(BaseTask): if not isinstance(v, models.Parameter): inputs[k] = models.Parameter.wrap(k, v) - self.inputs = OperationTask._merge_inputs(operation.inputs, inputs) - - self.plugin = None - if operation.plugin_specification: - self.plugin = OperationTask._find_plugin(operation.plugin_specification) - if self.plugin is None: - raise exceptions.OperationNotFoundException() ### - raise exceptions.PluginNotFoundException( - 'Could not find plugin "{0}" of operation "{1}" on interface "{2}" for ' - '{3} "{4}"' - .format(operation.plugin_specification.name, operation_name, interface_name, - actor_type, actor.name)) - else: - # Default plugin (execution) - execution_plugin.configure_operation(self) + self.inputs = OrderedDict(operation.inputs) + if inputs: + self.inputs.update(inputs) + self.inputs = FrozenDict(self.inputs) @classmethod def for_node(cls, @@ -140,7 +125,6 @@ class OperationTask(BaseTask): interface_name, operation_name, inputs=None, - configuration=None, max_attempts=None, retry_interval=None, ignore_failure=None): @@ -151,7 +135,6 @@ class OperationTask(BaseTask): :param interface_name: The interface name :param operation_name: The operation name within the interface :param inputs: Override the operation's inputs - :param configuration: Override the operation's configuration :param max_attempts: The maximum number of attempts in case the operation fails (if not specified the defaults it taken from the workflow context) :param retry_interval: The interval in seconds between attempts when the operation fails @@ -167,7 +150,6 @@ class OperationTask(BaseTask): interface_name=interface_name, operation_name=operation_name, inputs=inputs, - configuration=configuration, max_attempts=max_attempts, retry_interval=retry_interval, ignore_failure=ignore_failure) @@ -178,7 +160,6 @@ class OperationTask(BaseTask): interface_name, operation_name, inputs=None, - configuration=None, max_attempts=None, retry_interval=None, ignore_failure=None): @@ -189,7 +170,6 @@ class OperationTask(BaseTask): :param interface_name: The interface name :param operation_name: The operation name within the interface :param inputs: Override the operation's inputs - :param configuration: Override the operation's configuration :param max_attempts: The maximum number of attempts in case the operation fails (if not specified the defaults it taken from the workflow context) :param retry_interval: The interval in seconds between attempts when the operation fails @@ -205,29 +185,10 @@ class OperationTask(BaseTask): interface_name=interface_name, operation_name=operation_name, inputs=inputs, - configuration=configuration, max_attempts=max_attempts, retry_interval=retry_interval, ignore_failure=ignore_failure) - def _get_operation(self, interface_name, operation_name): - interface = self.actor.interfaces.get(interface_name) - if interface is not None: - return interface.operations.get(operation_name) - return None - - @staticmethod - def _find_plugin(plugin_specification): - workflow_context = context.workflow.current.get() - return plugin_specification.find_plugin(workflow_context.model.plugin.list()) - - @staticmethod - def _merge_inputs(operation_inputs, override_inputs=None): - final_inputs = OrderedDict(operation_inputs) - if override_inputs: - final_inputs.update(override_inputs) - return final_inputs - class WorkflowTask(BaseTask): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/aria/parser/consumption/modeling.py ---------------------------------------------------------------------- diff --git a/aria/parser/consumption/modeling.py b/aria/parser/consumption/modeling.py index 0163e31..6c616b4 100644 --- a/aria/parser/consumption/modeling.py +++ b/aria/parser/consumption/modeling.py @@ -103,7 +103,7 @@ class InstantiateServiceInstance(Consumer): def consume(self): if self.context.modeling.template is None: self.context.validation.report('InstantiateServiceInstance consumer: missing service ' - 'model') + 'template') return self.context.modeling.template.instantiate(None) @@ -154,6 +154,15 @@ class FindHosts(Consumer): self.context.modeling.instance.find_hosts() +class ConfigureOperations(Consumer): + """ + Configures all operations in the service instance. + """ + + def consume(self): + self.context.modeling.instance.configure_operations() + + class ServiceInstance(ConsumerChain): """ Generates the service instance by instantiating the service template. @@ -168,6 +177,7 @@ class ServiceInstance(ConsumerChain): CoerceServiceInstanceValues, ValidateCapabilities, FindHosts, + ConfigureOperations, CoerceServiceInstanceValues)) def dump(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/elasticsearch.yaml ---------------------------------------------------------------------- diff --git a/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/elasticsearch.yaml b/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/elasticsearch.yaml index 32623d1..72b210a 100644 --- a/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/elasticsearch.yaml +++ b/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/elasticsearch.yaml @@ -4,3 +4,5 @@ node_types: tosca.nodes.SoftwareComponent.Elasticsearch: derived_from: tosca.nodes.SoftwareComponent + capabilities: + app: tosca.capabilities.Endpoint http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/kibana.yaml ---------------------------------------------------------------------- diff --git a/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/kibana.yaml b/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/kibana.yaml index 7af00d0..4ee8700 100644 --- a/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/kibana.yaml +++ b/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/kibana.yaml @@ -8,3 +8,5 @@ node_types: - search_endpoint: capability: tosca.capabilities.Endpoint relationship: tosca.relationships.ConnectsTo + capabilities: + app: tosca.capabilities.Endpoint http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/logstash.yaml ---------------------------------------------------------------------- diff --git a/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/logstash.yaml b/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/logstash.yaml index a3eebbe..ea74c7e 100644 --- a/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/logstash.yaml +++ b/examples/tosca-simple-1.0/use-cases/multi-tier-1/custom_types/logstash.yaml @@ -8,3 +8,5 @@ node_types: - search_endpoint: capability: tosca.capabilities.Endpoint relationship: tosca.relationships.ConnectsTo + capabilities: + app: tosca.capabilities.Endpoint http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/custom_types/paypalpizzastore_nodejs_app.yaml ---------------------------------------------------------------------- diff --git a/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/custom_types/paypalpizzastore_nodejs_app.yaml b/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/custom_types/paypalpizzastore_nodejs_app.yaml index 4723a3f..02bb399 100644 --- a/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/custom_types/paypalpizzastore_nodejs_app.yaml +++ b/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/custom_types/paypalpizzastore_nodejs_app.yaml @@ -9,7 +9,7 @@ node_types: type: string requirements: - database_connection: - capability: tosca.capabilities.Container + capability: tosca.capabilities.Node tosca.nodes.WebServer.Nodejs: derived_from: tosca.nodes.WebServer http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/webserver-dbms-2.yaml ---------------------------------------------------------------------- diff --git a/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/webserver-dbms-2.yaml b/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/webserver-dbms-2.yaml index 66eab8e..91f0b35 100644 --- a/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/webserver-dbms-2.yaml +++ b/examples/tosca-simple-1.0/use-cases/webserver-dbms-2/webserver-dbms-2.yaml @@ -53,7 +53,7 @@ topology_template: implementation: scripts/nodejs/configure.sh inputs: github_url: { get_property: [ SELF, github_url ] } - mongodb_ip: { get_attribute: [mongo_server, private_address] } + mongodb_ip: { get_attribute: [ mongo_server, private_address ] } start: scripts/nodejs/start.sh nodejs: @@ -86,7 +86,7 @@ topology_template: configure: implementation: mongodb/config.sh inputs: - mongodb_ip: { get_attribute: [mongo_server, private_address] } + mongodb_ip: { get_attribute: [ mongo_server, private_address ] } start: mongodb/start.sh mongo_server: @@ -109,7 +109,7 @@ topology_template: nodejs_url: description: URL for the nodejs server, http://<IP>:3000 - value: { get_attribute: [app_server, private_address] } + value: { get_attribute: [ app_server, private_address ] } mongodb_url: description: URL for the mongodb server. value: { get_attribute: [ mongo_server, private_address ] } http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/extensions/aria_extension_tosca/simple_v1_0/modeling/capabilities.py ---------------------------------------------------------------------- diff --git a/extensions/aria_extension_tosca/simple_v1_0/modeling/capabilities.py b/extensions/aria_extension_tosca/simple_v1_0/modeling/capabilities.py index d9b9f6b..7be7239 100644 --- a/extensions/aria_extension_tosca/simple_v1_0/modeling/capabilities.py +++ b/extensions/aria_extension_tosca/simple_v1_0/modeling/capabilities.py @@ -78,6 +78,11 @@ def get_inherited_capability_definitions(context, presentation, for_presentation #capability_definitions[capability_name] = capability_definition else: capability_definition = our_capability_definition._clone(for_presentation) + if isinstance(capability_definition._raw, basestring): + # # Make sure we have a dict + the_type = capability_definition._raw + capability_definition._raw = OrderedDict() + capability_definition._raw['type'] = the_type capability_definitions[capability_name] = capability_definition merge_capability_definition_from_type(context, presentation, capability_definition) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index c3b98c1..2e117fa 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -583,48 +583,49 @@ class TestNode(object): node_template_storage.service.list()[0] -class TestNodeIP(object): +class TestNodeHostAddress(object): - ip = '1.1.1.1' + host_address = '1.1.1.1' - def test_ip_on_none_hosted_node(self, service_storage): - node_template = self._node_template(service_storage, ip='not considered') + def test_host_address_on_none_hosted_node(self, service_storage): + node_template = self._node_template(service_storage, host_address='not considered') node = self._node(service_storage, node_template, is_host=False, - ip='not considered') - assert node.ip is None + host_address='not considered') + assert node.host_address is None - def test_property_ip_on_host_node(self, service_storage): - node_template = self._node_template(service_storage, ip=self.ip) - node = self._node(service_storage, node_template, is_host=True, ip=None) - assert node.ip == self.ip + def test_property_host_address_on_host_node(self, service_storage): + node_template = self._node_template(service_storage, host_address=self.host_address) + node = self._node(service_storage, node_template, is_host=True, host_address=None) + assert node.host_address == self.host_address - def test_runtime_property_ip_on_host_node(self, service_storage): - node_template = self._node_template(service_storage, ip='not considered') - node = self._node(service_storage, node_template, is_host=True, ip=self.ip) - assert node.ip == self.ip + def test_runtime_property_host_address_on_host_node(self, service_storage): + node_template = self._node_template(service_storage, host_address='not considered') + node = self._node(service_storage, node_template, is_host=True, + host_address=self.host_address) + assert node.host_address == self.host_address - def test_no_ip_configured_on_host_node(self, service_storage): - node_template = self._node_template(service_storage, ip=None) - node = self._node(service_storage, node_template, is_host=True, ip=None) - assert node.ip is None + def test_no_host_address_configured_on_host_node(self, service_storage): + node_template = self._node_template(service_storage, host_address=None) + node = self._node(service_storage, node_template, is_host=True, host_address=None) + assert node.host_address is None def test_runtime_property_on_hosted_node(self, service_storage): - host_node_template = self._node_template(service_storage, ip=None) + host_node_template = self._node_template(service_storage, host_address=None) host_node = self._node(service_storage, host_node_template, is_host=True, - ip=self.ip) - node_template = self._node_template(service_storage, ip=None) + host_address=self.host_address) + node_template = self._node_template(service_storage, host_address=None) node = self._node(service_storage, node_template, is_host=False, - ip=None, + host_address=None, host_fk=host_node.id) - assert node.ip == self.ip + assert node.host_address == self.host_address - def _node_template(self, storage, ip): + def _node_template(self, storage, host_address): kwargs = dict( name='node_template', type=storage.type.list()[0], @@ -633,23 +634,27 @@ class TestNodeIP(object): min_instances=1, service_template=storage.service_template.list()[0] ) - if ip: - kwargs['properties'] = {'ip': Parameter.wrap('ip', ip)} + if host_address: + kwargs['properties'] = {'host_address': Parameter.wrap('host_address', host_address)} node = NodeTemplate(**kwargs) storage.node_template.put(node) return node - def _node(self, storage, node, is_host, ip, host_fk=None): + def _node(self, storage, node_template, is_host, host_address, host_fk=None): kwargs = dict( name='node', - node_template=node, + node_template=node_template, type=storage.type.list()[0], runtime_properties={}, state='', service=storage.service.list()[0] ) - if ip: - kwargs['runtime_properties']['ip'] = ip + if is_host and (host_address is None): + host_address = node_template.properties.get('host_address') + if host_address is not None: + host_address = host_address.value + if host_address: + kwargs['runtime_properties']['host_address'] = host_address if is_host: kwargs['host_fk'] = 1 elif host_fk: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 6721b29..89dd511 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -230,7 +230,6 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): plugin = mock.models.create_plugin() ctx.model.plugin.put(plugin) - plugin_specification = mock.models.create_plugin_specification() node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface = mock.models.create_interface( node.service, @@ -238,7 +237,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): operation_name, operation_kwargs=dict( implementation='{0}.{1}'.format(__name__, _test_plugin_workdir.__name__), - plugin_specification=plugin_specification) + plugin=plugin) ) node.interfaces[interface.name] = interface ctx.model.node.update(node) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index db45e8e..8b809b3 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -45,13 +45,12 @@ def _mock_workflow(ctx, graph): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) plugin = mock.models.create_plugin() ctx.model.plugin.put(plugin) - plugin_specification = mock.models.create_plugin_specification() interface = mock.models.create_interface( node.service, 'test', 'op', operation_kwargs=dict(implementation=_operation_mapping(), - plugin_specification=plugin_specification) + plugin=plugin) ) node.interfaces[interface.name] = interface task = api.task.OperationTask.for_node(node=node, interface_name='test', operation_name='op') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index dd36466..d86b6d2 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -292,12 +292,13 @@ class TestFabricEnvHideGroupsAndRunCommands(object): assert self.mock.settings_merged['timeout'] == timeout def test_implicit_host_string(self, mocker): - expected_ip = '1.1.1.1' - mocker.patch.object(self._Ctx.task.runs_on, 'ip', expected_ip) + expected_host_address = '1.1.1.1' + mocker.patch.object(self._Ctx.task.actor, 'host') + mocker.patch.object(self._Ctx.task.actor.host, 'host_address', expected_host_address) fabric_env = self.default_fabric_env.copy() del fabric_env['host_string'] self._run(fabric_env=fabric_env) - assert self.mock.settings_merged['host_string'] == expected_ip + assert self.mock.settings_merged['host_string'] == expected_host_address def test_explicit_host_string(self): fabric_env = self.default_fabric_env.copy() @@ -409,13 +410,15 @@ class TestFabricEnvHideGroupsAndRunCommands(object): raise RuntimeError class _Ctx(object): - class Stub(object): + class Task(object): @staticmethod def abort(message=None): models.Task.abort(message) - ip = None - task = Stub - task.runs_on = Stub + actor = None + class Actor(object): + host = None + task = Task + task.actor = Actor logger = logging.getLogger() @staticmethod http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/orchestrator/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py index 80d2351..a705199 100644 --- a/tests/orchestrator/workflows/api/test_task.py +++ b/tests/orchestrator/workflows/api/test_task.py @@ -18,7 +18,6 @@ import pytest from aria.orchestrator import context from aria.orchestrator.workflows import api -from aria.modeling import models from tests import mock, storage @@ -45,13 +44,11 @@ class TestOperationTask(object): plugin = mock.models.create_plugin('test_plugin', '0.1') ctx.model.node.update(plugin) - plugin_specification = mock.models.create_plugin_specification('test_plugin', '0.1') - interface = mock.models.create_interface( ctx.service, interface_name, operation_name, - operation_kwargs=dict(plugin_specification=plugin_specification, + operation_kwargs=dict(plugin=plugin, implementation='op_path')) node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) @@ -85,7 +82,6 @@ class TestOperationTask(object): assert api_task.max_attempts == max_attempts assert api_task.ignore_failure == ignore_failure assert api_task.plugin.name == 'test_plugin' - assert api_task.runs_on == models.Task.RUNS_ON_NODE def test_source_relationship_operation_task_creation(self, ctx): interface_name = 'test_interface' @@ -94,13 +90,11 @@ class TestOperationTask(object): plugin = mock.models.create_plugin('test_plugin', '0.1') ctx.model.plugin.update(plugin) - plugin_specification = mock.models.create_plugin_specification('test_plugin', '0.1') - interface = mock.models.create_interface( ctx.service, interface_name, operation_name, - operation_kwargs=dict(plugin_specification=plugin_specification, + operation_kwargs=dict(plugin=plugin, implementation='op_path') ) @@ -131,7 +125,6 @@ class TestOperationTask(object): assert api_task.retry_interval == retry_interval assert api_task.max_attempts == max_attempts assert api_task.plugin.name == 'test_plugin' - assert api_task.runs_on == models.Task.RUNS_ON_SOURCE def test_target_relationship_operation_task_creation(self, ctx): interface_name = 'test_interface' @@ -140,13 +133,11 @@ class TestOperationTask(object): plugin = mock.models.create_plugin('test_plugin', '0.1') ctx.model.node.update(plugin) - plugin_specification = mock.models.create_plugin_specification('test_plugin', '0.1') - interface = mock.models.create_interface( ctx.service, interface_name, operation_name, - operation_kwargs=dict(plugin_specification=plugin_specification, + operation_kwargs=dict(plugin=plugin, implementation='op_path') ) @@ -163,8 +154,7 @@ class TestOperationTask(object): operation_name=operation_name, inputs=inputs, max_attempts=max_attempts, - retry_interval=retry_interval, - runs_on=models.Task.RUNS_ON_TARGET) + retry_interval=retry_interval) assert api_task.name == api.task.OperationTask.NAME_FORMAT.format( type='relationship', @@ -178,7 +168,6 @@ class TestOperationTask(object): assert api_task.retry_interval == retry_interval assert api_task.max_attempts == max_attempts assert api_task.plugin.name == 'test_plugin' - assert api_task.runs_on == models.Task.RUNS_ON_TARGET def test_operation_task_default_values(self, ctx): interface_name = 'test_interface' @@ -187,15 +176,13 @@ class TestOperationTask(object): plugin = mock.models.create_plugin('package', '0.1') ctx.model.node.update(plugin) - plugin_specification = mock.models.create_plugin_specification('package', '0.1') - dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface = mock.models.create_interface( ctx.service, interface_name, operation_name, - operation_kwargs=dict(plugin_specification=plugin_specification, + operation_kwargs=dict(plugin=plugin, implementation='op_path')) dependency_node.interfaces[interface_name] = interface @@ -210,7 +197,6 @@ class TestOperationTask(object): assert task.max_attempts == ctx._task_max_attempts assert task.ignore_failure == ctx._task_ignore_failure assert task.plugin is plugin - assert task.runs_on == models.Task.RUNS_ON_NODE class TestWorkflowTask(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/orchestrator/workflows/builtin/test_execute_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py b/tests/orchestrator/workflows/builtin/test_execute_operation.py index 360e17d..4cddbe6 100644 --- a/tests/orchestrator/workflows/builtin/test_execute_operation.py +++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py @@ -34,7 +34,8 @@ def test_execute_operation(ctx): interface = mock.models.create_interface( ctx.service, interface_name, - operation_name + operation_name, + operation_kwargs={'implementation': 'test'} ) node.interfaces[interface.name] = interface ctx.model.node.update(node) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index 18ca056..8dda209 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -19,7 +19,6 @@ from datetime import ( import pytest -from aria.modeling import models from aria.orchestrator.context import workflow as workflow_context from aria.orchestrator.workflows import ( api, @@ -43,7 +42,8 @@ def ctx(tmpdir): interface = mock.models.create_interface( relationship.source_node.service, RELATIONSHIP_INTERFACE_NAME, - RELATIONSHIP_OPERATION_NAME + RELATIONSHIP_OPERATION_NAME, + operation_kwargs={'implementation': 'test'} ) relationship.interfaces[interface.name] = interface context.model.relationship.update(relationship) @@ -52,7 +52,8 @@ def ctx(tmpdir): interface = mock.models.create_interface( node.service, NODE_INTERFACE_NAME, - NODE_OPERATION_NAME + NODE_OPERATION_NAME, + operation_kwargs={'implementation': 'test'} ) node.interfaces[interface.name] = interface context.model.node.update(node) @@ -72,13 +73,12 @@ class TestOperationTask(object): core_task = core.task.OperationTask(api_task=api_task) return api_task, core_task - def _create_relationship_operation_task(self, ctx, relationship, runs_on): + def _create_relationship_operation_task(self, ctx, relationship): with workflow_context.current.push(ctx): api_task = api.task.OperationTask.for_relationship( relationship=relationship, interface_name=RELATIONSHIP_INTERFACE_NAME, - operation_name=RELATIONSHIP_OPERATION_NAME, - runs_on=runs_on) + operation_name=RELATIONSHIP_OPERATION_NAME) core_task = core.task.OperationTask(api_task=api_task) return api_task, core_task @@ -88,12 +88,11 @@ class TestOperationTask(object): ctx.model.plugin.put(storage_plugin) ctx.model.plugin.put(storage_plugin_other) node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - storage_plugin_specification = mock.models.create_plugin_specification('p1', '0.1') interface = mock.models.create_interface( node.service, NODE_INTERFACE_NAME, NODE_OPERATION_NAME, - operation_kwargs=dict(plugin_specification=storage_plugin_specification) + operation_kwargs=dict(plugin=storage_plugin, implementation='test') ) node.interfaces[interface.name] = interface ctx.model.node.update(node) @@ -101,7 +100,7 @@ class TestOperationTask(object): storage_task = ctx.model.task.get_by_name(core_task.name) assert storage_task.plugin is storage_plugin assert storage_task.execution_name == ctx.execution.name - assert storage_task.runs_on == core_task.context.node + assert storage_task.actor == core_task.context.node assert core_task.model_task == storage_task assert core_task.name == api_task.name assert core_task.implementation == api_task.implementation @@ -109,18 +108,12 @@ class TestOperationTask(object): assert core_task.inputs == api_task.inputs == storage_task.inputs assert core_task.plugin == storage_plugin - def test_source_relationship_operation_task_creation(self, ctx): + def test_relationship_operation_task_creation(self, ctx): relationship = ctx.model.relationship.list()[0] ctx.model.relationship.update(relationship) _, core_task = self._create_relationship_operation_task( - ctx, relationship, models.Task.RUNS_ON_SOURCE) - assert core_task.model_task.runs_on == relationship.source_node - - def test_target_relationship_operation_task_creation(self, ctx): - relationship = ctx.model.relationship.list()[0] - _, core_task = self._create_relationship_operation_task( - ctx, relationship, models.Task.RUNS_ON_TARGET) - assert core_task.model_task.runs_on == relationship.target_node + ctx, relationship) + assert core_task.model_task.actor == relationship def test_operation_task_edit_locked_attribute(self, ctx): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py index 0a95d43..514bce9 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py @@ -30,7 +30,8 @@ def test_task_graph_into_execution_graph(tmpdir): interface = mock.models.create_interface( node.service, interface_name, - operation_name + operation_name, + operation_kwargs={'implementation': 'test'} ) node.interfaces[interface.name] = interface task_context.model.node.update(node) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dabfc307/tests/resources/service-templates/tosca-simple-1.0/node-cellar/node-cellar.yaml ---------------------------------------------------------------------- diff --git a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/node-cellar.yaml b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/node-cellar.yaml index 65fbf92..d163c1e 100644 --- a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/node-cellar.yaml +++ b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/node-cellar.yaml @@ -166,7 +166,7 @@ topology_template: #- { concat: [ process.args.1 >, mongodb ] } - process.args.1 > mongodb - process.args.2 > host - - ssh.user > admin + - ssh.user2 > admin - ssh.password > 1234 - ssh.use_sudo > true