http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index bb9d839..127641f 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -12,22 +12,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """ A common context for both workflow and operation """ + import logging from contextlib import contextmanager from datetime import datetime from functools import partial -from uuid import uuid4 import jinja2 -from aria import logger as aria_logger -from aria.storage import ( - exceptions, +from aria import ( + logger as aria_logger, modeling ) +from aria.storage import exceptions + +from ...utils.uuid import generate_uuid class BaseContext(object): @@ -51,17 +54,17 @@ class BaseContext(object): def __init__( self, name, - service_instance_id, + service_id, model_storage, resource_storage, workdir=None, **kwargs): super(BaseContext, self).__init__(**kwargs) self._name = name - self._id = str(uuid4()) + self._id = generate_uuid(variant='uuid') self._model = model_storage self._resource = resource_storage - self._service_instance_id = service_instance_id + self._service_id = service_id self._workdir = workdir self.logger = None @@ -89,14 +92,14 @@ class BaseContext(object): if self._model._initiator: api_kwargs.update(self._model._initiator(**self._model._initiator_kwargs)) api_kwargs.update(**self._model._api_kwargs) - return aria_logger.create_sqla_log_handler(log_cls=modeling.model.Log, + return aria_logger.create_sqla_log_handler(log_cls=modeling.models.Log, execution_id=self._execution_id, **api_kwargs) def __repr__(self): return ( '{name}(name={self.name}, ' - 'deployment_id={self._service_instance_id}, ' + 'deployment_id={self._service_id}, ' .format(name=self.__class__.__name__, self=self)) @contextmanager @@ -135,14 +138,14 @@ class BaseContext(object): """ The blueprint model """ - return self.service_instance.service_template + return self.service.service_template @property - def service_instance(self): + def service(self): """ The deployment model """ - return self.model.service_instance.get(self._service_instance_id) + return self.model.service.get(self._service_id) @property def name(self): @@ -165,7 +168,7 @@ class BaseContext(object): Download a blueprint resource from the resource storage """ try: - self.resource.deployment.download(entry_id=str(self.service_instance.id), + self.resource.deployment.download(entry_id=str(self.service.id), destination=destination, path=path) except exceptions.StorageError: @@ -190,7 +193,7 @@ class BaseContext(object): Read a deployment resource as string from the resource storage """ try: - return self.resource.deployment.read(entry_id=str(self.service_instance.id), path=path) + return self.resource.deployment.read(entry_id=str(self.service.id), path=path) except exceptions.StorageError: return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index d2716e8..cbd186c 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -33,7 +33,7 @@ class BaseOperationContext(BaseContext): name, model_storage, resource_storage, - service_instance_id, + service_id, task_id, actor_id, execution_id, @@ -42,7 +42,7 @@ class BaseOperationContext(BaseContext): name=name, model_storage=model_storage, resource_storage=resource_storage, - service_instance_id=service_instance_id, + service_id=service_id, **kwargs) self._task_id = task_id self._actor_id = actor_id @@ -79,11 +79,11 @@ class BaseOperationContext(BaseContext): """ A work directory that is unique to the plugin and the deployment id """ - if not self.task.plugin_name: + if self.task.plugin is None: return None plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir, - self.service_instance.id, - self.task.plugin_name) + self.service.id, + self.task.plugin.name) file.makedirs(plugin_workdir) return plugin_workdir @@ -92,7 +92,7 @@ class BaseOperationContext(BaseContext): context_cls = self.__class__ context_dict = { 'name': self.name, - 'service_instance_id': self._service_instance_id, + 'service_id': self._service_id, 'task_id': self._task_id, 'actor_id': self._actor_id, 'workdir': self._workdir, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 5de4b51..5f86d9d 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -50,14 +50,14 @@ class WorkflowContext(BaseContext): def __repr__(self): return ( - '{name}(deployment_id={self._service_instance_id}, ' + '{name}(deployment_id={self._service_id}, ' 'workflow_name={self._workflow_name}'.format( name=self.__class__.__name__, self=self)) def _create_execution(self): now = datetime.utcnow() execution = self.model.execution.model_cls( - service_instance=self.service_instance, + service=self.service, workflow_name=self._workflow_name, created_at=now, parameters=self.parameters, @@ -88,11 +88,11 @@ class WorkflowContext(BaseContext): """ Iterator over nodes """ - key = 'service_instance_{0}'.format(self.model.node_template.model_cls.name_column_name()) + key = 'service_{0}'.format(self.model.node_template.model_cls.name_column_name()) return self.model.node_template.iter( filters={ - key: getattr(self.service_instance, self.service_instance.name_column_name()) + key: getattr(self.service, self.service.name_column_name()) } ) @@ -101,10 +101,10 @@ class WorkflowContext(BaseContext): """ Iterator over node instances """ - key = 'service_instance_{0}'.format(self.model.node.model_cls.name_column_name()) + key = 'service_{0}'.format(self.model.node.model_cls.name_column_name()) return self.model.node.iter( filters={ - key: getattr(self.service_instance, self.service_instance.name_column_name()) + key: getattr(self.service, self.service.name_column_name()) } ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/decorators.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py index 3ced61c..62e4a14 100644 --- a/aria/orchestrator/decorators.py +++ b/aria/orchestrator/decorators.py @@ -17,10 +17,10 @@ Workflow and operation decorators """ -from uuid import uuid4 from functools import partial, wraps -from aria.utils.validation import validate_function_arguments +from ..utils.validation import validate_function_arguments +from ..utils.uuid import generate_uuid from . import context from .workflows.api import task_graph @@ -78,4 +78,4 @@ def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=No def _generate_name(func_name, ctx, suffix_template, **custom_kwargs): return '{func_name}.{suffix}'.format( func_name=func_name, - suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4())) + suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or generate_uuid(variant='uuid')) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py index bb92d1c..f1633fa 100644 --- a/aria/orchestrator/runner.py +++ b/aria/orchestrator/runner.py @@ -47,7 +47,7 @@ class Runner(object): """ def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn, - service_instance_id, storage_path='', is_storage_temporary=True): + service_id_fn, storage_path='', is_storage_temporary=True): if storage_path == '': # Temporary file storage the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-') @@ -58,8 +58,8 @@ class Runner(object): self._storage_name = os.path.basename(storage_path) self._is_storage_temporary = is_storage_temporary - workflow_context = self.create_workflow_context(workflow_name, service_instance_id, - initialize_model_storage_fn) + workflow_context = self.create_workflow_context(workflow_name, initialize_model_storage_fn, + service_id_fn) tasks_graph = workflow_fn(ctx=workflow_context, **inputs) @@ -76,20 +76,21 @@ class Runner(object): def create_workflow_context(self, workflow_name, - service_instance_id, - initialize_model_storage_fn): + initialize_model_storage_fn, + service_id_fn): self.cleanup() model_storage = application_model_storage( sql_mapi.SQLAlchemyModelAPI, initiator_kwargs=dict(base_dir=self._storage_dir, filename=self._storage_name)) - initialize_model_storage_fn(model_storage) + if initialize_model_storage_fn: + initialize_model_storage_fn(model_storage) resource_storage = application_resource_storage( filesystem_rapi.FileSystemResourceAPI, api_kwargs=dict(directory='.')) return WorkflowContext( name=workflow_name, model_storage=model_storage, resource_storage=resource_storage, - service_instance_id=service_instance_id, + service_id=service_id_fn(), workflow_name=self.__class__.__name__, task_max_attempts=1, task_retry_interval=1) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 6a00844..9522d7a 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -16,10 +16,10 @@ """ Provides the tasks to be entered into the task graph """ -from uuid import uuid4 - -from aria.storage.modeling import model +from ....modeling import models +from ....utils.collections import OrderedDict +from ....utils.uuid import generate_uuid from ... import context from .. import exceptions @@ -28,12 +28,13 @@ class BaseTask(object): """ Abstract task_graph task """ + def __init__(self, ctx=None, **kwargs): if ctx is not None: self._workflow_context = ctx else: self._workflow_context = context.workflow.current.get() - self._id = str(uuid4()) + self._id = generate_uuid(variant='uuid') @property def id(self): @@ -57,33 +58,30 @@ class OperationTask(BaseTask): Represents an operation task in the task_graph """ - SOURCE_OPERATION = 'source' - TARGET_OPERATION = 'target' + NAME_FORMAT = '{interface}:{operation}@{type}:{name}' def __init__(self, - name, actor, - implementation, + actor_type, + interface_name, + operation_name, + runs_on=None, max_attempts=None, retry_interval=None, ignore_failure=None, - inputs=None, - plugin=None, - runs_on=None): + inputs=None): """ - Creates an operation task using the name, details, node instance and any additional kwargs. - :param name: the operation of the name. - :param actor: the operation host on which this operation is registered. - :param inputs: operation inputs. + Do not call this constructor directly. Instead, use :meth:`for_node` or + :meth:`for_relationship`. """ - assert isinstance(actor, (model.Node, - model.Relationship)) + + assert isinstance(actor, (models.Node, models.Relationship)) + assert actor_type in ('node', 'relationship') + assert interface_name and operation_name + assert runs_on in models.Task.RUNS_ON super(OperationTask, self).__init__() + self.actor = actor - self.name = '{name}.{actor.id}'.format(name=name, actor=actor) - self.implementation = implementation - self.inputs = inputs or {} - self.plugin = plugin or {} self.max_attempts = (self.workflow_context._task_max_attempts if max_attempts is None else max_attempts) self.retry_interval = (self.workflow_context._task_retry_interval @@ -92,144 +90,137 @@ class OperationTask(BaseTask): if ignore_failure is None else ignore_failure) self.runs_on = runs_on - @classmethod - def _merge_inputs(cls, operation_inputs, additional_inputs=None): - final_inputs = dict((p.name, p.as_raw['value']) for p in operation_inputs) - final_inputs.update(additional_inputs or {}) - return final_inputs + # Wrap inputs + if inputs: + for k, v in inputs.iteritems(): + if not isinstance(v, models.Parameter): + inputs[k] = models.Parameter.wrap(k, v) + + # TODO: Suggestion: these extra inputs could be stored as a separate entry in the task + # model, because they are different from the operation inputs. If we do this, then the two + # kinds of inputs should *not* be merged here. + + operation = self._get_operation(interface_name, operation_name) + if operation is None: + raise exceptions.OperationNotFoundException( + 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' + .format(operation_name, interface_name, actor_type, actor.name)) + + self.plugin = None + if operation.plugin_specification: + self.plugin = OperationTask._find_plugin(operation.plugin_specification) + if self.plugin is None: + raise exceptions.PluginNotFoundException( + 'Could not find plugin of operation "{0}" on interface "{1}" for {2} "{3}"' + .format(operation_name, interface_name, actor_type, actor.name)) + + self.implementation = operation.implementation + self.inputs = OperationTask._merge_inputs(operation.inputs, inputs) + + self.name = OperationTask.NAME_FORMAT.format(type=actor_type, + name=actor.name, + interface=interface_name, + operation=operation_name) @classmethod - def node(cls, instance, name, inputs=None, *args, **kwargs): + def for_node(cls, + node, + interface_name, + operation_name, + max_attempts=None, + retry_interval=None, + ignore_failure=None, + inputs=None): """ - Represents a node based operation - - :param instance: the node of which this operation belongs to. - :param name: the name of the operation. + Creates an operation on a node. + + :param node: The node on which to run the operation + :param interface_name: The interface name + :param operation_name: The operation name within the interface + :param max_attempts: The maximum number of attempts in case the operation fails + (if not specified the defaults it taken from the workflow context) + :param retry_interval: The interval in seconds between attempts when the operation fails + (if not specified the defaults it taken from the workflow context) + :param ignore_failure: Whether to ignore failures + (if not specified the defaults it taken from the workflow context) + :param inputs: Additional operation inputs """ - assert isinstance(instance, model.Node) - interface_name = _get_interface_name(name) - interfaces = instance.interfaces.filter_by(name=interface_name) - if interfaces.count() > 1: - raise exceptions.TaskException( - "More than one interface with the same name `{0}` found".format(name) - ) - elif interfaces.count() == 0: - raise exceptions.TaskException( - "No Interface with the name `{interface_name}` found".format( - interface_name=interface_name) - ) - - operation_templates = interfaces[0].operations.filter_by(name=name) - if operation_templates.count() > 1: - raise exceptions.TaskException( - "More than one operation with the same name `{0}` were found".format(name) - ) - - elif operation_templates.count() == 0: - raise exceptions.TaskException( - "No interface with the name `{operation_name}` found".format( - operation_name=name) - ) - - return cls._instance( - instance=instance, - name=name, - operation_template=operation_templates[0], - plugins=instance.plugins or [], - runs_on=model.Task.RUNS_ON_NODE_INSTANCE, - inputs=cls._merge_inputs(operation_templates[0].inputs, inputs), - *args, - **kwargs) + + assert isinstance(node, models.Node) + return cls( + actor=node, + actor_type='node', + interface_name=interface_name, + operation_name=operation_name, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure, + inputs=inputs, + runs_on=models.Task.RUNS_ON_NODE) @classmethod - def relationship(cls, instance, name, edge, runs_on=None, inputs=None, *args, - **kwargs): + def for_relationship(cls, + relationship, + interface_name, + operation_name, + runs_on=models.Task.RUNS_ON_SOURCE, + max_attempts=None, + retry_interval=None, + ignore_failure=None, + inputs=None): """ - Represents a relationship based operation - - :param instance: the relationship of which this operation belongs to. - :param name: the name of the operation. - :param edge: the edge of the interface ("source" or "target"). - :param runs_on: where to run the operation ("source" or "target"); if None defaults to the - interface edge. - :param inputs any additional inputs to the operation + Creates an operation on a relationship edge. + + :param relationship: The relationship on which to run the operation + :param interface_name: The interface name + :param operation_name: The operation name within the interface + :param runs_on: where to run the operation ("source" or "target"); defaults to "source" + :param max_attempts: The maximum number of attempts in case the operation fails + (if not specified the defaults it taken from the workflow context) + :param retry_interval: The interval in seconds between attempts when the operation fails + (if not specified the defaults it taken from the workflow context) + :param ignore_failure: Whether to ignore failures + (if not specified the defaults it taken from the workflow context) + :param inputs: Additional operation inputs """ - assert isinstance(instance, model.Relationship) - interface_name = _get_interface_name(name) - interfaces = instance.interfaces.filter_by(name=interface_name, edge=edge) - count = interfaces.count() - if count > 1: - raise exceptions.TaskException( - "More than one interface with the same name `{interface_name}` found at `{edge}`" - + " edge".format( - interface_name=interface_name, edge=edge) - ) - elif count == 0: - raise exceptions.TaskException( - "No interface with the name `{interface_name}` found at `{edge}` edge".format( - interface_name=interface_name, edge=edge) - ) - - operations = interfaces.all()[0].operations.filter_by(name=name) - count = operations.count() - if count > 1: - raise exceptions.TaskException( - "More than one operation with the same name `{0}` found".format(name) - ) - elif count == 0: - raise exceptions.TaskException( - "No operation with the name `{operation_name}` found".format( - operation_name=name) - ) - - if not runs_on: - if edge == cls.SOURCE_OPERATION: - runs_on = model.Task.RUNS_ON_SOURCE - else: - runs_on = model.Task.RUNS_ON_TARGET - - if runs_on == model.Task.RUNS_ON_SOURCE: - plugins = instance.source_node.plugins - else: - plugins = instance.target_node.plugins - return cls._instance(instance=instance, - name=name, - operation_template=operations[0], - plugins=plugins or [], - runs_on=runs_on, - inputs=cls._merge_inputs(operations[0].inputs, inputs), - *args, - **kwargs) - - @classmethod - def _instance(cls, - instance, - name, - operation_template, - inputs, - plugins, - runs_on, - *args, - **kwargs): - matching_plugins = [p for p in plugins if p['name'] == operation_template.plugin] - # All matching plugins should have identical package_name/package_version, so it's safe to - # take the first found. - plugin = matching_plugins[0] if matching_plugins else {} - return cls(actor=instance, - name=name, - implementation=operation_template.implementation, - inputs=inputs, - plugin=plugin, - runs_on=runs_on, - *args, - **kwargs) + assert isinstance(relationship, models.Relationship) + assert runs_on in models.Task.RUNS_ON + return cls( + actor=relationship, + actor_type='relationship', + interface_name=interface_name, + operation_name=operation_name, + runs_on=runs_on, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure, + inputs=inputs) + + def _get_operation(self, interface_name, operation_name): + interface = self.actor.interfaces.get(interface_name) + if interface is not None: + return interface.operations.get(operation_name) + return None + + @staticmethod + def _find_plugin(plugin_specification): + workflow_context = context.workflow.current.get() + return plugin_specification.find_plugin(workflow_context.model.plugin.list()) + + @staticmethod + def _merge_inputs(operation_inputs, override_inputs=None): + final_inputs = OrderedDict(operation_inputs) + if override_inputs: + final_inputs.update(override_inputs) + return final_inputs class WorkflowTask(BaseTask): """ - Represents an workflow task in the task_graph + Represents a workflow task in the task graph """ + def __init__(self, workflow_func, **kwargs): """ Creates a workflow based task using the workflow_func provided, and its kwargs @@ -259,8 +250,3 @@ class StubTask(BaseTask): """ Enables creating empty tasks. """ - pass - - -def _get_interface_name(operation_name): - return operation_name.rsplit('.', 1)[0] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/api/task_graph.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task_graph.py b/aria/orchestrator/workflows/api/task_graph.py index c88d343..92a39d2 100644 --- a/aria/orchestrator/workflows/api/task_graph.py +++ b/aria/orchestrator/workflows/api/task_graph.py @@ -17,11 +17,11 @@ Task graph. Used by users to build workflows """ -from uuid import uuid4 from collections import Iterable from networkx import DiGraph, topological_sort +from ....utils.uuid import generate_uuid from . import task as api_task @@ -49,7 +49,7 @@ class TaskGraph(object): def __init__(self, name): self.name = name - self._id = str(uuid4()) + self._id = generate_uuid(variant='uuid') self._graph = DiGraph() def __repr__(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py index 5a7f6ce..348f47a 100644 --- a/aria/orchestrator/workflows/builtin/execute_operation.py +++ b/aria/orchestrator/workflows/builtin/execute_operation.py @@ -25,7 +25,8 @@ from ... import workflow def execute_operation( ctx, graph, - operation, + interface_name, + operation_name, operation_kwargs, allow_kwargs_override, run_by_dependency_order, @@ -50,33 +51,33 @@ def execute_operation( """ subgraphs = {} # filtering node instances - filtered_nodes = list(_filter_node_instances( + filtered_nodes = list(_filter_nodes( context=ctx, node_template_ids=node_template_ids, node_ids=node_ids, type_names=type_names)) if run_by_dependency_order: - filtered_node_instances_ids = set(node_instance.id - for node_instance in filtered_nodes) - for node in ctx.node_instances: - if node.id not in filtered_node_instances_ids: + filtered_node_ids = set(node_instance.id for node_instance in filtered_nodes) + for node in ctx.nodes: + if node.id not in filtered_node_ids: subgraphs[node.id] = ctx.task_graph( name='execute_operation_stub_{0}'.format(node.id)) # registering actual tasks to sequences for node in filtered_nodes: graph.add_tasks( - _create_node_instance_task( - nodes=node, - operation=operation, + _create_node_task( + node=node, + interface_name=interface_name, + operation_name=operation_name, operation_kwargs=operation_kwargs, allow_kwargs_override=allow_kwargs_override ) ) - for _, node_instance_sub_workflow in subgraphs.items(): - graph.add_tasks(node_instance_sub_workflow) + for _, node_sub_workflow in subgraphs.items(): + graph.add_tasks(node_sub_workflow) # adding tasks dependencies if required if run_by_dependency_order: @@ -86,31 +87,32 @@ def execute_operation( source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]]) -def _filter_node_instances(context, node_template_ids=(), node_ids=(), type_names=()): - def _is_node_by_id(node_id): - return not node_template_ids or node_id in node_template_ids +def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()): + def _is_node_template_by_id(node_template_id): + return not node_template_ids or node_template_id in node_template_ids - def _is_node_instance_by_id(node_instance_id): - return not node_ids or node_instance_id in node_ids + def _is_node_by_id(node_id): + return not node_ids or node_id in node_ids - def _is_node_by_type(node_type_hierarchy): - return not type_names or node_type_hierarchy in type_names + def _is_node_by_type(node_type): + return not node_type.name in type_names for node in context.nodes: - if all((_is_node_by_id(node.node_template.id), - _is_node_instance_by_id(node.id), - _is_node_by_type(node.node_template.type_hierarchy))): + if all((_is_node_template_by_id(node.node_template.id), + _is_node_by_id(node.id), + _is_node_by_type(node.node_template.type))): yield node -def _create_node_instance_task( - nodes, - operation, +def _create_node_task( + node, + interface_name, + operation_name, operation_kwargs, allow_kwargs_override): """ A workflow which executes a single operation - :param nodes: the node instance to install + :param node: the node instance to install :param basestring operation: the operation name :param dict operation_kwargs: :param bool allow_kwargs_override: @@ -120,7 +122,8 @@ def _create_node_instance_task( if allow_kwargs_override is not None: operation_kwargs['allow_kwargs_override'] = allow_kwargs_override - return OperationTask.node( - instance=nodes, - name=operation, + return OperationTask.for_node( + node=node, + interface_name=interface_name, + operation_name=operation_name, inputs=operation_kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py index 2592323..92b96ea 100644 --- a/aria/orchestrator/workflows/builtin/heal.py +++ b/aria/orchestrator/workflows/builtin/heal.py @@ -26,156 +26,156 @@ from ..api import task @workflow -def heal(ctx, graph, node_instance_id): +def heal(ctx, graph, node_id): """ The heal workflow :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the graph which will describe the workflow. - :param node_instance_id: the id of the node instance to heal + :param node_id: the id of the node to heal :return: """ - failing_node = ctx.model.node.get(node_instance_id) + failing_node = ctx.model.node.get(node_id) host_node = ctx.model.node.get(failing_node.host.id) - failed_node_instance_subgraph = _get_contained_subgraph(ctx, host_node) - failed_node_instance_ids = list(n.id for n in failed_node_instance_subgraph) + failed_node_subgraph = _get_contained_subgraph(ctx, host_node) + failed_node_ids = list(n.id for n in failed_node_subgraph) - targeted_node_instances = [node_instance for node_instance in ctx.node_instances - if node_instance.id not in failed_node_instance_ids] + targeted_nodes = [node for node in ctx.nodes + if node.id not in failed_node_ids] uninstall_subgraph = task.WorkflowTask( heal_uninstall, - failing_node_instances=failed_node_instance_subgraph, - targeted_node_instances=targeted_node_instances + failing_nodes=failed_node_subgraph, + targeted_nodes=targeted_nodes ) install_subgraph = task.WorkflowTask( heal_install, - failing_node_instances=failed_node_instance_subgraph, - targeted_node_instances=targeted_node_instances) + failing_nodes=failed_node_subgraph, + targeted_nodes=targeted_nodes) graph.sequence(uninstall_subgraph, install_subgraph) -@workflow(suffix_template='{failing_node_instances}') -def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): +@workflow(suffix_template='{failing_nodes}') +def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes): """ the uninstall part of the heal mechanism :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the task graph to edit. - :param failing_node_instances: the failing nodes to heal. - :param targeted_node_instances: the targets of the relationships where the failing node are + :param failing_nodes: the failing nodes to heal. + :param targeted_nodes: the targets of the relationships where the failing node are source :return: """ - node_instance_sub_workflows = {} - - # Create install stub workflow for each unaffected node instance - for node_instance in targeted_node_instances: - node_instance_stub = task.StubTask() - node_instance_sub_workflows[node_instance.id] = node_instance_stub - graph.add_tasks(node_instance_stub) - - # create install sub workflow for every node instance - for node_instance in failing_node_instances: - node_instance_sub_workflow = task.WorkflowTask(uninstall_node, - node_instance=node_instance) - node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_tasks(node_instance_sub_workflow) - - # create dependencies between the node instance sub workflow - for node_instance in failing_node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.outbound_relationship_instances): + node_sub_workflows = {} + + # Create install stub workflow for each unaffected node + for node in targeted_nodes: + node_stub = task.StubTask() + node_sub_workflows[node.id] = node_stub + graph.add_tasks(node_stub) + + # create install sub workflow for every node + for node in failing_nodes: + node_sub_workflow = task.WorkflowTask(uninstall_node, + node=node) + node_sub_workflows[node.id] = node_sub_workflow + graph.add_tasks(node_sub_workflow) + + # create dependencies between the node sub workflow + for node in failing_nodes: + node_sub_workflow = node_sub_workflows[node.id] + for relationship in reversed(node.outbound_relationships): graph.add_dependency( - node_instance_sub_workflows[relationship_instance.target_node_instance.id], - node_instance_sub_workflow) + node_sub_workflows[relationship.target_node.id], + node_sub_workflow) - # Add operations for intact nodes depending on a node instance belonging to node_instances - for node_instance in targeted_node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + # Add operations for intact nodes depending on a node belonging to nodes + for node in targeted_nodes: + node_sub_workflow = node_sub_workflows[node.id] - for relationship_instance in reversed(node_instance.outbound_relationship_instances): + for relationship in reversed(node.outbound_relationships): - target_node_instance = \ - ctx.model.node.get(relationship_instance.target_node_instance.id) - target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id] - graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow) + target_node = \ + ctx.model.node.get(relationship.target_node.id) + target_node_subgraph = node_sub_workflows[target_node.id] + graph.add_dependency(target_node_subgraph, node_sub_workflow) - if target_node_instance in failing_node_instances: + if target_node in failing_nodes: dependency = relationship_tasks( - relationship_instance=relationship_instance, + relationship=relationship, operation_name='aria.interfaces.relationship_lifecycle.unlink') graph.add_tasks(*dependency) - graph.add_dependency(node_instance_sub_workflow, dependency) + graph.add_dependency(node_sub_workflow, dependency) -@workflow(suffix_template='{failing_node_instances}') -def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): +@workflow(suffix_template='{failing_nodes}') +def heal_install(ctx, graph, failing_nodes, targeted_nodes): """ the install part of the heal mechanism :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the task graph to edit. - :param failing_node_instances: the failing nodes to heal. - :param targeted_node_instances: the targets of the relationships where the failing node are + :param failing_nodes: the failing nodes to heal. + :param targeted_nodes: the targets of the relationships where the failing node are source :return: """ - node_instance_sub_workflows = {} + node_sub_workflows = {} # Create install sub workflow for each unaffected - for node_instance in targeted_node_instances: - node_instance_stub = task.StubTask() - node_instance_sub_workflows[node_instance.id] = node_instance_stub - graph.add_tasks(node_instance_stub) - - # create install sub workflow for every node instance - for node_instance in failing_node_instances: - node_instance_sub_workflow = task.WorkflowTask(install_node, - node_instance=node_instance) - node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow - graph.add_tasks(node_instance_sub_workflow) - - # create dependencies between the node instance sub workflow - for node_instance in failing_node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - if node_instance.outbound_relationship_instances: + for node in targeted_nodes: + node_stub = task.StubTask() + node_sub_workflows[node.id] = node_stub + graph.add_tasks(node_stub) + + # create install sub workflow for every node + for node in failing_nodes: + node_sub_workflow = task.WorkflowTask(install_node, + node=node) + node_sub_workflows[node.id] = node_sub_workflow + graph.add_tasks(node_sub_workflow) + + # create dependencies between the node sub workflow + for node in failing_nodes: + node_sub_workflow = node_sub_workflows[node.id] + if node.outbound_relationships: dependencies = \ - [node_instance_sub_workflows[relationship_instance.target_node_instance.id] - for relationship_instance in node_instance.outbound_relationship_instances] - graph.add_dependency(node_instance_sub_workflow, dependencies) - - # Add operations for intact nodes depending on a node instance - # belonging to node_instances - for node_instance in targeted_node_instances: - node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - - for relationship_instance in node_instance.outbound_relationship_instances: - target_node_instance = ctx.model.node.get( - relationship_instance.target_node_instance.id) - target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id] - graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow) - - if target_node_instance in failing_node_instances: + [node_sub_workflows[relationship.target_node.id] + for relationship in node.outbound_relationships] + graph.add_dependency(node_sub_workflow, dependencies) + + # Add operations for intact nodes depending on a node + # belonging to nodes + for node in targeted_nodes: + node_sub_workflow = node_sub_workflows[node.id] + + for relationship in node.outbound_relationships: + target_node = ctx.model.node.get( + relationship.target_node.id) + target_node_subworkflow = node_sub_workflows[target_node.id] + graph.add_dependency(node_sub_workflow, target_node_subworkflow) + + if target_node in failing_nodes: dependent = relationship_tasks( - relationship_instance=relationship_instance, + relationship=relationship, operation_name='aria.interfaces.relationship_lifecycle.establish') graph.add_tasks(*dependent) - graph.add_dependency(dependent, node_instance_sub_workflow) + graph.add_dependency(dependent, node_sub_workflow) -def _get_contained_subgraph(context, host_node_instance): - contained_instances = [node_instance - for node_instance in context.node_instances - if node_instance.host_fk == host_node_instance.id and - node_instance.host_fk != node_instance.id] - result = [host_node_instance] +def _get_contained_subgraph(context, host_node): + contained_instances = [node + for node in context.nodes + if node.host_fk == host_node.id and + node.host_fk != node.id] + result = [host_node] if not contained_instances: return result result.extend(contained_instances) - for node_instance in contained_instances: - result.extend(_get_contained_subgraph(context, node_instance)) + for node in contained_instances: + result.extend(_get_contained_subgraph(context, node)) return set(result) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/builtin/utils.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py index c9dbc6b..8efa889 100644 --- a/aria/orchestrator/workflows/builtin/utils.py +++ b/aria/orchestrator/workflows/builtin/utils.py @@ -14,20 +14,24 @@ # limitations under the License. from ..api.task import OperationTask +from .. import exceptions -def create_node_task(operation_name, node): +def create_node_task(interface_name, operation_name, node): """ Returns a new operation task if the operation exists in the node, otherwise returns None. """ - if _has_operation(node.interfaces, operation_name): - return OperationTask.node(instance=node, - name=operation_name) - return None + try: + return OperationTask.for_node(node=node, + interface_name=interface_name, + operation_name=operation_name) + except exceptions.OperationNotFoundException: + # We will skip nodes which do not have the operation + return None -def create_relationship_tasks(operation_name, runs_on, node): +def create_relationship_tasks(interface_name, operation_name, runs_on, node): """ Returns a list of operation tasks for each outbound relationship of the node if the operation exists there. @@ -35,12 +39,15 @@ def create_relationship_tasks(operation_name, runs_on, node): sequence = [] for relationship in node.outbound_relationships: - if _has_operation(relationship.interfaces, operation_name): + try: sequence.append( - OperationTask.relationship(instance=relationship, - name=operation_name, - edge='source', - runs_on=runs_on)) + OperationTask.for_relationship(relationship=relationship, + interface_name=interface_name, + operation_name=operation_name, + runs_on=runs_on)) + except exceptions.OperationNotFoundException: + # We will skip relationships which do not have the operation + pass return sequence @@ -49,16 +56,16 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): Creates dependencies between tasks if there is a relationship (outbound) between their nodes. """ - def get_task(node_id): + def get_task(node_name): for task, node in tasks_and_nodes: - if node.id == node_id: + if node.name == node_name: return task return None for task, node in tasks_and_nodes: dependencies = [] for relationship in node.outbound_relationships: - dependency = get_task(relationship.target_node.id) + dependency = get_task(relationship.target_node.name) if dependency: dependencies.append(dependency) if dependencies: @@ -67,10 +74,3 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): graph.add_dependency(dependency, task) else: graph.add_dependency(task, dependencies) - - -def _has_operation(interfaces, operation_name): - for interface in interfaces: - if interface.operations.filter_by(name=operation_name).count() == 1: - return True - return False http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py index 180b4e9..6065343 100644 --- a/aria/orchestrator/workflows/builtin/workflows.py +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -14,31 +14,32 @@ # limitations under the License. """ -A set of builtin workflows. +TSOCA normative lifecycle workflows. """ -from .utils import (create_node_task, create_relationship_tasks) from ... import workflow +from ....modeling.models import Task +from .utils import (create_node_task, create_relationship_tasks) NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard' NORMATIVE_CONFIGURE_INTERFACE = 'Configure' # 'tosca.interfaces.relationship.Configure' -NORMATIVE_CREATE = NORMATIVE_STANDARD_INTERFACE + '.create' -NORMATIVE_START = NORMATIVE_STANDARD_INTERFACE + '.start' -NORMATIVE_STOP = NORMATIVE_STANDARD_INTERFACE + '.stop' -NORMATIVE_DELETE = NORMATIVE_STANDARD_INTERFACE + '.delete' +NORMATIVE_CREATE = 'create' +NORMATIVE_START = 'start' +NORMATIVE_STOP = 'stop' +NORMATIVE_DELETE = 'delete' -NORMATIVE_CONFIGURE = NORMATIVE_STANDARD_INTERFACE + '.configure' -NORMATIVE_PRE_CONFIGURE_SOURCE = NORMATIVE_CONFIGURE_INTERFACE + '.pre_configure_source' -NORMATIVE_PRE_CONFIGURE_TARGET = NORMATIVE_CONFIGURE_INTERFACE + '.pre_configure_target' -NORMATIVE_POST_CONFIGURE_SOURCE = NORMATIVE_CONFIGURE_INTERFACE + '.post_configure_source' -NORMATIVE_POST_CONFIGURE_TARGET = NORMATIVE_CONFIGURE_INTERFACE + '.post_configure_target' +NORMATIVE_CONFIGURE = 'configure' +NORMATIVE_PRE_CONFIGURE_SOURCE = 'pre_configure_source' +NORMATIVE_PRE_CONFIGURE_TARGET = 'pre_configure_target' +NORMATIVE_POST_CONFIGURE_SOURCE = 'post_configure_source' +NORMATIVE_POST_CONFIGURE_TARGET = 'post_configure_target' -NORMATIVE_ADD_SOURCE = NORMATIVE_CONFIGURE_INTERFACE + '.add_source' -NORMATIVE_ADD_TARGET = NORMATIVE_CONFIGURE_INTERFACE + '.add_target' -NORMATIVE_REMOVE_TARGET = NORMATIVE_CONFIGURE_INTERFACE + '.remove_target' -NORMATIVE_TARGET_CHANGED = NORMATIVE_CONFIGURE_INTERFACE + '.target_changed' +NORMATIVE_ADD_SOURCE = 'add_source' +NORMATIVE_ADD_TARGET = 'add_target' +NORMATIVE_REMOVE_TARGET = 'remove_target' +NORMATIVE_TARGET_CHANGED = 'target_changed' __all__ = ( @@ -64,40 +65,40 @@ __all__ = ( ) -@workflow(suffix_template='{node.id}') +@workflow(suffix_template='{node.name}') def install_node(graph, node, **kwargs): sequence = [] # Create sequence.append( create_node_task( - NORMATIVE_CREATE, + NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE, node)) # Configure sequence += \ create_relationship_tasks( - NORMATIVE_PRE_CONFIGURE_SOURCE, - 'source', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_PRE_CONFIGURE_SOURCE, + Task.RUNS_ON_SOURCE, node) sequence += \ create_relationship_tasks( - NORMATIVE_PRE_CONFIGURE_TARGET, - 'target', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_PRE_CONFIGURE_TARGET, + Task.RUNS_ON_TARGET, node) sequence.append( create_node_task( - NORMATIVE_CONFIGURE, + NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE, node)) sequence += \ create_relationship_tasks( - NORMATIVE_POST_CONFIGURE_SOURCE, - 'source', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_POST_CONFIGURE_SOURCE, + Task.RUNS_ON_SOURCE, node) sequence += \ create_relationship_tasks( - NORMATIVE_POST_CONFIGURE_TARGET, - 'target', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_POST_CONFIGURE_TARGET, + Task.RUNS_ON_TARGET, node) # Start @@ -106,7 +107,7 @@ def install_node(graph, node, **kwargs): graph.sequence(*sequence) -@workflow(suffix_template='{node.id}') +@workflow(suffix_template='{node.name}') def uninstall_node(graph, node, **kwargs): # Stop sequence = _create_stop_tasks(node) @@ -114,18 +115,18 @@ def uninstall_node(graph, node, **kwargs): # Delete sequence.append( create_node_task( - NORMATIVE_DELETE, + NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE, node)) graph.sequence(*sequence) -@workflow(suffix_template='{node.id}') +@workflow(suffix_template='{node.name}') def start_node(graph, node, **kwargs): graph.sequence(*_create_start_tasks(node)) -@workflow(suffix_template='{node.id}') +@workflow(suffix_template='{node.name}') def stop_node(graph, node, **kwargs): graph.sequence(*_create_stop_tasks(node)) @@ -134,22 +135,22 @@ def _create_start_tasks(node): sequence = [] sequence.append( create_node_task( - NORMATIVE_START, + NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START, node)) sequence += \ create_relationship_tasks( - NORMATIVE_ADD_SOURCE, - 'source', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_ADD_SOURCE, + Task.RUNS_ON_SOURCE, node) sequence += \ create_relationship_tasks( - NORMATIVE_ADD_TARGET, - 'target', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_ADD_TARGET, + Task.RUNS_ON_TARGET, node) sequence += \ create_relationship_tasks( - NORMATIVE_TARGET_CHANGED, - 'target', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_TARGET_CHANGED, + Task.RUNS_ON_TARGET, node) return sequence @@ -158,16 +159,16 @@ def _create_stop_tasks(node): sequence = [] sequence += \ create_relationship_tasks( - NORMATIVE_REMOVE_TARGET, - 'target', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_REMOVE_TARGET, + Task.RUNS_ON_TARGET, node) sequence += \ create_relationship_tasks( - NORMATIVE_TARGET_CHANGED, - 'target', + NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_TARGET_CHANGED, + Task.RUNS_ON_TARGET, node) sequence.append( create_node_task( - NORMATIVE_STOP, + NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP, node)) return sequence http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index c6ac2b3..fa4550d 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -23,7 +23,7 @@ from datetime import datetime import networkx from aria import logger -from aria.storage.modeling import model +from aria.modeling import models from aria.orchestrator import events from .. import exceptions @@ -82,18 +82,18 @@ class Engine(logger.LoggerMixin): events.on_cancelling_workflow_signal.send(self._workflow_context) def _is_cancel(self): - return self._workflow_context.execution.status in [model.Execution.CANCELLING, - model.Execution.CANCELLED] + return self._workflow_context.execution.status in [models.Execution.CANCELLING, + models.Execution.CANCELLED] def _executable_tasks(self): now = datetime.utcnow() return (task for task in self._tasks_iter() - if task.status in model.Task.WAIT_STATES and + if task.status in models.Task.WAIT_STATES and task.due_at <= now and not self._task_has_dependencies(task)) def _ended_tasks(self): - return (task for task in self._tasks_iter() if task.status in model.Task.END_STATES) + return (task for task in self._tasks_iter() if task.status in models.Task.END_STATES) def _task_has_dependencies(self, task): return len(self._execution_graph.pred.get(task.id, {})) > 0 @@ -105,19 +105,19 @@ class Engine(logger.LoggerMixin): for _, data in self._execution_graph.nodes_iter(data=True): task = data['task'] if isinstance(task, engine_task.OperationTask): - if task.model_task.status not in model.Task.END_STATES: + if task.model_task.status not in models.Task.END_STATES: self._workflow_context.model.task.refresh(task.model_task) yield task def _handle_executable_task(self, task): if isinstance(task, engine_task.StubTask): - task.status = model.Task.SUCCESS + task.status = models.Task.SUCCESS else: events.sent_task_signal.send(task) self._executor.execute(task) def _handle_ended_tasks(self, task): - if task.status == model.Task.FAILED and not task.ignore_failure: + if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: self._execution_graph.remove_node(task.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index c973ad9..a420d2b 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -14,7 +14,7 @@ # limitations under the License. """ -Aria's events Sub-Package +ARIA's events Sub-Package Path: aria.events.storage_event_handler Implementation of storage handlers for workflow and operation events. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 9f63bcf..f23312d 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -16,6 +16,7 @@ """ Workflow tasks """ + from contextlib import contextmanager from datetime import datetime from functools import ( @@ -23,9 +24,9 @@ from functools import ( wraps, ) -from aria.storage.modeling import model -from aria.orchestrator.context import operation as operation_context +from ....modeling import models +from ...context import operation as operation_context from .. import exceptions @@ -36,7 +37,7 @@ def _locked(func=None): @wraps(func) def _wrapper(self, value, **kwargs): if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") + raise exceptions.TaskException('Task is not in update mode') return func(self, value, **kwargs) return _wrapper @@ -65,66 +66,61 @@ class StubTask(BaseTask): def __init__(self, *args, **kwargs): super(StubTask, self).__init__(*args, **kwargs) - self.status = model.Task.PENDING + self.status = models.Task.PENDING self.due_at = datetime.utcnow() class StartWorkflowTask(StubTask): """ - Tasks marking a workflow start + Task marking a workflow start """ pass class EndWorkflowTask(StubTask): """ - Tasks marking a workflow end + Task marking a workflow end """ pass class StartSubWorkflowTask(StubTask): """ - Tasks marking a subworkflow start + Task marking a subworkflow start """ pass class EndSubWorkflowTask(StubTask): """ - Tasks marking a subworkflow end + Task marking a subworkflow end """ pass class OperationTask(BaseTask): """ - Operation tasks + Operation task """ def __init__(self, api_task, *args, **kwargs): super(OperationTask, self).__init__(id=api_task.id, **kwargs) self._workflow_context = api_task._workflow_context model_storage = api_task._workflow_context.model + plugin = api_task.plugin base_task_model = model_storage.task.model_cls - if isinstance(api_task.actor, model.Node): + if isinstance(api_task.actor, models.Node): context_cls = operation_context.NodeOperationContext - task_model_cls = base_task_model.as_node_instance - elif isinstance(api_task.actor, model.Relationship): + create_task_model = base_task_model.for_node + elif isinstance(api_task.actor, models.Relationship): context_cls = operation_context.RelationshipOperationContext - task_model_cls = base_task_model.as_relationship_instance + create_task_model = base_task_model.for_relationship else: raise RuntimeError('No operation context could be created for {actor.model_cls}' .format(actor=api_task.actor)) - plugin = api_task.plugin - plugins = model_storage.plugin.list(filters={ - 'package_name': plugin.get('package_name', ''), - 'package_version': plugin.get('package_version', '') - }) - # Validation during installation ensures that at most one plugin can exists with provided - # package_name and package_version - operation_task = task_model_cls( + + task_model = create_task_model( name=api_task.name, implementation=api_task.implementation, instance=api_task.actor, @@ -133,22 +129,21 @@ class OperationTask(BaseTask): max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, ignore_failure=api_task.ignore_failure, - plugin=plugins[0] if plugins else None, - plugin_name=plugin.get('name'), + plugin=plugin, execution=self._workflow_context.execution, runs_on=api_task.runs_on ) - self._workflow_context.model.task.put(operation_task) + self._workflow_context.model.task.put(task_model) self._ctx = context_cls(name=api_task.name, model_storage=self._workflow_context.model, resource_storage=self._workflow_context.resource, - service_instance_id=self._workflow_context._service_instance_id, - task_id=operation_task.id, + service_id=self._workflow_context._service_id, + task_id=task_model.id, actor_id=api_task.actor.id, execution_id=self._workflow_context._execution_id, workdir=self._workflow_context._workdir) - self._task_id = operation_task.id + self._task_id = task_model.id self._update_fields = None @contextmanager http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 73d8994..e831bfe 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -15,7 +15,7 @@ """ -Aria's events Sub-Package +ARIA's events Sub-Package Path: aria.events.storage_event_handler Implementation of logger handlers for workflow and operation events. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/exceptions.py b/aria/orchestrator/workflows/exceptions.py index e2f5b59..4fb8dd7 100644 --- a/aria/orchestrator/workflows/exceptions.py +++ b/aria/orchestrator/workflows/exceptions.py @@ -68,4 +68,15 @@ class TaskException(exceptions.AriaError): """ Raised by the task """ - pass + + +class OperationNotFoundException(TaskException): + """ + Could not find an operation on the node or relationship. + """ + + +class PluginNotFoundException(TaskException): + """ + Could not find a plugin matching the plugin specification. + """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py index 8a096b5..baa0375 100644 --- a/aria/orchestrator/workflows/executor/celery.py +++ b/aria/orchestrator/workflows/executor/celery.py @@ -44,7 +44,7 @@ class CeleryExecutor(BaseExecutor): def execute(self, task): self._tasks[task.id] = task - inputs = task.inputs.copy() + inputs = dict((k, v.value) for k, v in task.inputs.iteritems()) inputs['ctx'] = task.context self._results[task.id] = self._app.send_task( task.operation_mapping, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index d999b37..6397e88 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -47,7 +47,7 @@ from aria.utils import imports from aria.utils import exceptions from aria.orchestrator.workflows.executor import base from aria.storage import instrumentation -from aria.storage.modeling import type as storage_type +from aria.modeling import types as modeling_types _IS_WIN = os.name == 'nt' @@ -148,7 +148,7 @@ class ProcessExecutor(base.BaseExecutor): return { 'task_id': task.id, 'implementation': task.implementation, - 'operation_inputs': task.inputs, + 'operation_inputs': dict((k, v.value) for k, v in task.inputs.iteritems()), 'port': self._server_port, 'context': task.context.serialization_dict, } @@ -381,7 +381,8 @@ def _main(): # This is required for the instrumentation work properly. # See docstring of `remove_mutable_association_listener` for further details - storage_type.remove_mutable_association_listener() + modeling_types.remove_mutable_association_listener() + with instrumentation.track_changes() as instrument: try: ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 6c59986..1a49af5 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -58,7 +58,8 @@ class ThreadExecutor(BaseExecutor): self._task_started(task) try: task_func = imports.load_attribute(task.implementation) - task_func(ctx=task.context, **task.inputs) + inputs = dict((k, v.value) for k, v in task.inputs.iteritems()) + task_func(ctx=task.context, **inputs) self._task_succeeded(task) except BaseException as e: self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/parser/consumption/__init__.py ---------------------------------------------------------------------- diff --git a/aria/parser/consumption/__init__.py b/aria/parser/consumption/__init__.py index 7b7590e..7da8490 100644 --- a/aria/parser/consumption/__init__.py +++ b/aria/parser/consumption/__init__.py @@ -20,7 +20,7 @@ from .style import Style from .consumer import Consumer, ConsumerChain from .presentation import Read from .validation import Validate -from .modeling import Model, Types, Instance +from .modeling import ServiceTemplate, Types, ServiceInstance from .inputs import Inputs __all__ = ( @@ -31,7 +31,7 @@ __all__ = ( 'ConsumerChain', 'Read', 'Validate', - 'Model', + 'ServiceTemplate', 'Types', - 'Instance', + 'ServiceInstance', 'Inputs') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/parser/consumption/modeling.py ---------------------------------------------------------------------- diff --git a/aria/parser/consumption/modeling.py b/aria/parser/consumption/modeling.py index 599c260..4847ba7 100644 --- a/aria/parser/consumption/modeling.py +++ b/aria/parser/consumption/modeling.py @@ -17,61 +17,65 @@ from ...utils.formatting import json_dumps, yaml_dumps from .consumer import Consumer, ConsumerChain -class Derive(Consumer): +class DeriveServiceTemplate(Consumer): """ - Derives the service model. + Derives the service template from the presenter. """ def consume(self): if self.context.presentation.presenter is None: - self.context.validation.report('Derive consumer: missing presenter') + self.context.validation.report('DeriveServiceTemplate consumer: missing presenter') return - if not hasattr(self.context.presentation.presenter, '_get_service_model'): - self.context.validation.report('Derive consumer: presenter does not support ' - '"_get_service_model"') + if not hasattr(self.context.presentation.presenter, '_get_model'): + self.context.validation.report('DeriveServiceTemplate consumer: presenter does not' + ' support "_get_model"') return - self.context.modeling.model = \ - self.context.presentation.presenter._get_service_model(self.context) + self.context.modeling.template = \ + self.context.presentation.presenter._get_model(self.context) -class CoerceModelValues(Consumer): +class CoerceServiceTemplateValues(Consumer): """ - Coerces values in the service model. + Coerces values in the service template. """ def consume(self): - self.context.modeling.model.coerce_values(self.context, None, True) + self.context.modeling.template.coerce_values(None, True) -class ValidateModel(Consumer): +class ValidateServiceTemplate(Consumer): """ - Validates the service model. + Validates the service template. """ def consume(self): - self.context.modeling.model.validate(self.context) + self.context.modeling.template.validate() -class Model(ConsumerChain): + +class ServiceTemplate(ConsumerChain): """ - Generates the service model by deriving it from the presentation. + Generates the service template from the presenter. """ def __init__(self, context): - super(Model, self).__init__(context, (Derive, CoerceModelValues, ValidateModel)) + super(ServiceTemplate, self).__init__(context, (DeriveServiceTemplate, + CoerceServiceTemplateValues, + ValidateServiceTemplate)) def dump(self): if self.context.has_arg_switch('yaml'): indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.modeling.model_as_raw + raw = self.context.modeling.template_as_raw self.context.write(yaml_dumps(raw, indent=indent)) elif self.context.has_arg_switch('json'): indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.modeling.model_as_raw + raw = self.context.modeling.template_as_raw self.context.write(json_dumps(raw, indent=indent)) else: - self.context.modeling.model.dump(self.context) + self.context.modeling.template.dump() + class Types(Consumer): """ @@ -88,35 +92,40 @@ class Types(Consumer): raw = self.context.modeling.types_as_raw self.context.write(json_dumps(raw, indent=indent)) else: - self.context.modeling.dump_types(self.context) + self.context.modeling.template.dump_types() -class Instantiate(Consumer): + +class InstantiateServiceInstance(Consumer): """ - Instantiates the service model. + Instantiates the service template into a service instance. """ def consume(self): - if self.context.modeling.model is None: - self.context.validation.report('Instantiate consumer: missing service model') + if self.context.modeling.template is None: + self.context.validation.report('InstantiateServiceInstance consumer: missing service ' + 'model') return - self.context.modeling.model.instantiate(self.context, None) + self.context.modeling.template.instantiate(None) + -class CoerceInstanceValues(Consumer): +class CoerceServiceInstanceValues(Consumer): """ Coerces values in the service instance. """ def consume(self): - self.context.modeling.instance.coerce_values(self.context, None, True) + self.context.modeling.instance.coerce_values(None, True) -class ValidateInstance(Consumer): + +class ValidateServiceInstance(Consumer): """ Validates the service instance. """ def consume(self): - self.context.modeling.instance.validate(self.context) + self.context.modeling.instance.validate() + class SatisfyRequirements(Consumer): """ @@ -124,7 +133,8 @@ class SatisfyRequirements(Consumer): """ def consume(self): - self.context.modeling.instance.satisfy_requirements(self.context) + self.context.modeling.instance.satisfy_requirements() + class ValidateCapabilities(Consumer): """ @@ -132,22 +142,27 @@ class ValidateCapabilities(Consumer): """ def consume(self): - self.context.modeling.instance.validate_capabilities(self.context) + self.context.modeling.instance.validate_capabilities() + -class Instance(ConsumerChain): +class ServiceInstance(ConsumerChain): """ - Generates the service instance by instantiating the service model. + Generates the service instance by instantiating the service template. """ def __init__(self, context): - super(Instance, self).__init__(context, (Instantiate, CoerceInstanceValues, - ValidateInstance, CoerceInstanceValues, - SatisfyRequirements, CoerceInstanceValues, - ValidateCapabilities, CoerceInstanceValues)) + super(ServiceInstance, self).__init__(context, (InstantiateServiceInstance, + CoerceServiceInstanceValues, + ValidateServiceInstance, + CoerceServiceInstanceValues, + SatisfyRequirements, + CoerceServiceInstanceValues, + ValidateCapabilities, + CoerceServiceInstanceValues)) def dump(self): if self.context.has_arg_switch('graph'): - self.context.modeling.instance.dump_graph(self.context) + self.context.modeling.instance.dump_graph() elif self.context.has_arg_switch('yaml'): indent = self.context.get_arg_value_int('indent', 2) raw = self.context.modeling.instance_as_raw @@ -157,4 +172,4 @@ class Instance(ConsumerChain): raw = self.context.modeling.instance_as_raw self.context.write(json_dumps(raw, indent=indent)) else: - self.context.modeling.instance.dump(self.context) + self.context.modeling.instance.dump() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/parser/consumption/style.py ---------------------------------------------------------------------- diff --git a/aria/parser/consumption/style.py b/aria/parser/consumption/style.py index 88ad934..72892b9 100644 --- a/aria/parser/consumption/style.py +++ b/aria/parser/consumption/style.py @@ -43,7 +43,7 @@ class Style(object): @staticmethod def literal(value): - return Colored.yellow(safe_repr(value), bold=True) + return Colored.magenta(safe_repr(value)) @staticmethod def meta(value): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/parser/modeling/__init__.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/__init__.py b/aria/parser/modeling/__init__.py index cad25ca..df127cd 100644 --- a/aria/parser/modeling/__init__.py +++ b/aria/parser/modeling/__init__.py @@ -14,57 +14,9 @@ # limitations under the License. from .context import IdType, ModelingContext -from .elements import Element, ModelElement, Function, Parameter, Metadata -from .instance_elements import (ServiceInstance, Node, Capability, Relationship, Artifact, Group, - Policy, GroupPolicy, GroupPolicyTrigger, Mapping, Substitution, - Interface, Operation) -from .model_elements import (ServiceModel, NodeTemplate, RequirementTemplate, CapabilityTemplate, - RelationshipTemplate, ArtifactTemplate, GroupTemplate, PolicyTemplate, - GroupPolicyTemplate, GroupPolicyTriggerTemplate, MappingTemplate, - SubstitutionTemplate, InterfaceTemplate, OperationTemplate) -from .types import TypeHierarchy, Type, RelationshipType, PolicyType, PolicyTriggerType -from .exceptions import CannotEvaluateFunctionException __all__ = ( 'IdType', - 'ModelingContext', - 'Element', - 'ModelElement', - 'Function', - 'Parameter', - 'Metadata', - 'ServiceInstance', - 'Node', - 'Capability', - 'Relationship', - 'Artifact', - 'Group', - 'Policy', - 'GroupPolicy', - 'GroupPolicyTrigger', - 'Mapping', - 'Substitution', - 'Interface', - 'Operation', - 'ServiceModel', - 'NodeTemplate', - 'RequirementTemplate', - 'CapabilityTemplate', - 'RelationshipTemplate', - 'ArtifactTemplate', - 'GroupTemplate', - 'PolicyTemplate', - 'GroupPolicyTemplate', - 'GroupPolicyTriggerTemplate', - 'MappingTemplate', - 'SubstitutionTemplate', - 'InterfaceTemplate', - 'OperationTemplate', - 'TypeHierarchy', - 'Type', - 'RelationshipType', - 'PolicyType', - 'PolicyTriggerType', - 'CannotEvaluateFunctionException', + 'ModelingContext' ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9841ca4a/aria/parser/modeling/context.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/context.py b/aria/parser/modeling/context.py index d47c202..dff5991 100644 --- a/aria/parser/modeling/context.py +++ b/aria/parser/modeling/context.py @@ -15,11 +15,8 @@ import itertools -from ...utils.collections import StrictDict, prune, OrderedDict -from ...utils.formatting import as_raw -from ...utils.console import puts -from .types import TypeHierarchy -from .utils import generate_id_string +from ...utils.collections import StrictDict, prune +from ...utils.uuid import generate_uuid class IdType(object): @@ -35,7 +32,7 @@ class IdType(object): UNIVERSAL_RANDOM = 2 """ - Universally unique ID (UUID): 25 random safe characters. + Universally unique ID (UUID): 22 random safe characters. """ @@ -43,73 +40,58 @@ class ModelingContext(object): """ Properties: - * :code:`model`: The generated service model + * :code:`template`: The generated service template * :code:`instance`: The generated service instance + * :code:`node_id_format`: Format for node instance IDs * :code:`id_type`: Type of IDs to use for instances * :code:`id_max_length`: Maximum allowed instance ID length * :code:`inputs`: Dict of inputs values - * :code:`node_types`: The generated hierarchy of node types - * :code:`group_types`: The generated hierarchy of group types - * :code:`capability_types`: The generated hierarchy of capability types - * :code:`relationship_types`: The generated hierarchy of relationship types - * :code:`policy_types`: The generated hierarchy of policy types - * :code:`policy_trigger_types`: The generated hierarchy of policy trigger types - * :code:`artifact_types`: The generated hierarchy of artifact types - * :code:`interface_types`: The generated hierarchy of interface types """ def __init__(self): - self.model = None + self.template = None self.instance = None + self.node_id_format = '{template}_{id}' #self.id_type = IdType.LOCAL_SERIAL #self.id_type = IdType.LOCAL_RANDOM self.id_type = IdType.UNIVERSAL_RANDOM self.id_max_length = 63 # See: http://www.faqs.org/rfcs/rfc1035.html self.inputs = StrictDict(key_class=basestring) - self.node_types = TypeHierarchy() - self.group_types = TypeHierarchy() - self.capability_types = TypeHierarchy() - self.relationship_types = TypeHierarchy() - self.policy_types = TypeHierarchy() - self.policy_trigger_types = TypeHierarchy() - self.artifact_types = TypeHierarchy() - self.interface_types = TypeHierarchy() self._serial_id_counter = itertools.count(1) self._locally_unique_ids = set() + def store(self, model_storage): + if self.template is not None: + model_storage.service_template.put(self.template) + if self.instance is not None: + model_storage.service.put(self.instance) + + def generate_node_id(self, template_name): + return self.node_id_format.format( + template=template_name, + id=self.generate_id()) + def generate_id(self): if self.id_type == IdType.LOCAL_SERIAL: return self._serial_id_counter.next() elif self.id_type == IdType.LOCAL_RANDOM: - the_id = generate_id_string(6) + the_id = generate_uuid(6) while the_id in self._locally_unique_ids: - the_id = generate_id_string(6) + the_id = generate_uuid(6) self._locally_unique_ids.add(the_id) return the_id - return generate_id_string() + return generate_uuid() def set_input(self, name, value): self.inputs[name] = value # TODO: coerce to validate type @property - def types_as_raw(self): - return OrderedDict(( - ('node_types', as_raw(self.node_types)), - ('group_types', as_raw(self.group_types)), - ('capability_types', as_raw(self.capability_types)), - ('relationship_types', as_raw(self.relationship_types)), - ('policy_types', as_raw(self.policy_types)), - ('policy_trigger_types', as_raw(self.policy_trigger_types)), - ('artifact_types', as_raw(self.artifact_types)), - ('interface_types', as_raw(self.interface_types)))) - - @property - def model_as_raw(self): - raw = self.model.as_raw + def template_as_raw(self): + raw = self.template.as_raw prune(raw) return raw @@ -118,29 +100,3 @@ class ModelingContext(object): raw = self.instance.as_raw prune(raw) return raw - - def dump_types(self, context): - if self.node_types.children: - puts('Node types:') - self.node_types.dump(context) - if self.group_types.children: - puts('Group types:') - self.group_types.dump(context) - if self.capability_types.children: - puts('Capability types:') - self.capability_types.dump(context) - if self.relationship_types.children: - puts('Relationship types:') - self.relationship_types.dump(context) - if self.policy_types.children: - puts('Policy types:') - self.policy_types.dump(context) - if self.policy_trigger_types.children: - puts('Policy trigger types:') - self.policy_trigger_types.dump(context) - if self.artifact_types.children: - puts('Artifact types:') - self.artifact_types.dump(context) - if self.interface_types.children: - puts('Interface types:') - self.interface_types.dump(context)
