ARIA-44 Merge parser and storage model Additional Changes: * As a part of this task the name to address the model has been changes to the __tablename__ (or __mapiname__ if specified) * All of the relationships return a query, so accessing the entire list should be done via all()
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b6193359 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b6193359 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b6193359 Branch: refs/heads/ARIA-79-concurrent-storage-modifications Commit: b6193359239ec673f3d7b313dd04122302e75ba6 Parents: 1498ad3 Author: mxmrlv <mxm...@gmail.com> Authored: Thu Jan 19 11:39:36 2017 +0200 Committer: mxmrlv <mxm...@gmail.com> Committed: Sun Feb 12 18:12:13 2017 +0200 ---------------------------------------------------------------------- aria/__init__.py | 49 +- aria/cli/args_parser.py | 4 +- aria/cli/commands.py | 15 +- aria/orchestrator/context/common.py | 22 +- aria/orchestrator/context/operation.py | 43 +- aria/orchestrator/context/toolbelt.py | 2 +- aria/orchestrator/context/workflow.py | 22 +- aria/orchestrator/runner.py | 11 +- aria/orchestrator/workflows/api/task.py | 149 +- .../workflows/builtin/execute_operation.py | 54 +- aria/orchestrator/workflows/builtin/heal.py | 14 +- aria/orchestrator/workflows/builtin/install.py | 16 +- aria/orchestrator/workflows/builtin/start.py | 6 +- aria/orchestrator/workflows/builtin/stop.py | 6 +- .../orchestrator/workflows/builtin/uninstall.py | 18 +- aria/orchestrator/workflows/builtin/utils.py | 52 +- .../orchestrator/workflows/builtin/workflows.py | 175 ++- aria/orchestrator/workflows/core/engine.py | 2 +- aria/orchestrator/workflows/core/task.py | 26 +- aria/orchestrator/workflows/executor/process.py | 14 +- aria/orchestrator/workflows/executor/thread.py | 2 +- aria/parser/modeling/__init__.py | 8 +- aria/parser/modeling/storage.py | 224 ++- aria/parser/modeling/utils.py | 4 +- aria/storage/__init__.py | 13 +- aria/storage/api.py | 6 +- aria/storage/base_model.py | 757 ---------- aria/storage/core.py | 7 +- aria/storage/instrumentation.py | 15 +- aria/storage/model.py | 110 -- aria/storage/modeling/__init__.py | 35 + aria/storage/modeling/elements.py | 106 ++ aria/storage/modeling/instance_elements.py | 1286 ++++++++++++++++ aria/storage/modeling/model.py | 219 +++ aria/storage/modeling/orchestrator_elements.py | 468 ++++++ aria/storage/modeling/structure.py | 320 ++++ aria/storage/modeling/template_elements.py | 1387 ++++++++++++++++++ aria/storage/modeling/type.py | 302 ++++ aria/storage/modeling/utils.py | 139 ++ aria/storage/sql_mapi.py | 6 +- aria/storage/structure.py | 190 --- aria/storage/type.py | 299 ---- aria/storage_initializer.py | 135 ++ aria/utils/application.py | 24 +- .../profiles/tosca-simple-1.0/groups.yaml | 2 +- tests/mock/context.py | 4 +- tests/mock/models.py | 123 +- tests/mock/topology.py | 100 +- tests/orchestrator/context/test_operation.py | 125 +- .../context/test_resource_render.py | 2 +- tests/orchestrator/context/test_serialize.py | 18 +- tests/orchestrator/context/test_toolbelt.py | 75 +- tests/orchestrator/context/test_workflow.py | 14 +- .../orchestrator/execution_plugin/test_local.py | 63 +- tests/orchestrator/execution_plugin/test_ssh.py | 21 +- tests/orchestrator/test_runner.py | 11 +- tests/orchestrator/workflows/__init__.py | 2 +- tests/orchestrator/workflows/api/test_task.py | 83 +- .../workflows/builtin/test_execute_operation.py | 11 +- .../orchestrator/workflows/builtin/test_heal.py | 8 +- .../orchestrator/workflows/core/test_engine.py | 25 +- tests/orchestrator/workflows/core/test_task.py | 80 +- .../test_task_graph_into_exececution_graph.py | 20 +- .../workflows/executor/test_executor.py | 9 +- .../workflows/executor/test_process_executor.py | 8 +- .../executor/test_process_executor_extension.py | 15 +- .../test_process_executor_tracked_changes.py | 51 +- tests/resources/scripts/test_ssh.sh | 30 +- .../service_templates/node-cellar/workflows.py | 9 +- tests/storage/__init__.py | 11 +- tests/storage/test_instrumentation.py | 17 +- tests/storage/test_model_storage.py | 45 +- tests/storage/test_models.py | 643 ++++---- tests/storage/test_structures.py | 135 +- 74 files changed, 5797 insertions(+), 2725 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 8b87473..18eaa56 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -61,26 +61,47 @@ def application_model_storage(api, api_kwargs=None, initiator=None, initiator_kw """ Initiate model storage """ - models = [ - storage.model.Plugin, + models_to_register = [ + storage.modeling.model.Parameter, - storage.model.Blueprint, - storage.model.Deployment, - storage.model.DeploymentUpdate, - storage.model.DeploymentUpdateStep, - storage.model.DeploymentModification, + storage.modeling.model.MappingTemplate, + storage.modeling.model.SubstitutionTemplate, + storage.modeling.model.ServiceTemplate, + storage.modeling.model.NodeTemplate, + storage.modeling.model.GroupTemplate, + storage.modeling.model.InterfaceTemplate, + storage.modeling.model.OperationTemplate, + storage.modeling.model.ArtifactTemplate, + storage.modeling.model.PolicyTemplate, + storage.modeling.model.GroupPolicyTemplate, + storage.modeling.model.GroupPolicyTriggerTemplate, + storage.modeling.model.RequirementTemplate, + storage.modeling.model.CapabilityTemplate, - storage.model.Node, - storage.model.NodeInstance, - storage.model.Relationship, - storage.model.RelationshipInstance, + storage.modeling.model.Mapping, + storage.modeling.model.Substitution, + storage.modeling.model.ServiceInstance, + storage.modeling.model.Node, + storage.modeling.model.Group, + storage.modeling.model.Interface, + storage.modeling.model.Operation, + storage.modeling.model.Capability, + storage.modeling.model.Artifact, + storage.modeling.model.Policy, + storage.modeling.model.GroupPolicy, + storage.modeling.model.GroupPolicyTrigger, + storage.modeling.model.Relationship, - storage.model.Execution, - storage.model.Task, + storage.modeling.model.Execution, + storage.modeling.model.ServiceInstanceUpdate, + storage.modeling.model.ServiceInstanceUpdateStep, + storage.modeling.model.ServiceInstanceModification, + storage.modeling.model.Plugin, + storage.modeling.model.Task ] return storage.ModelStorage(api_cls=api, api_kwargs=api_kwargs, - items=models, + items=models_to_register, initiator=initiator, initiator_kwargs=initiator_kwargs or {}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/cli/args_parser.py ---------------------------------------------------------------------- diff --git a/aria/cli/args_parser.py b/aria/cli/args_parser.py index e661620..50fec39 100644 --- a/aria/cli/args_parser.py +++ b/aria/cli/args_parser.py @@ -138,9 +138,9 @@ def add_workflow_parser(workflow): default='install', help='The workflow name') workflow.add_argument( - '-d', '--deployment-id', + '-i', '--service-instance-id', required=False, - help='A unique ID for the deployment') + help='A unique ID for the service instance') @sub_parser_decorator( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index 0890cd1..91d748f 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -42,7 +42,7 @@ from ..parser.consumption import ( Instance ) from ..parser.loading import LiteralLocation, UriLocation -from ..parser.modeling import initialize_storage +from ..parser.modeling.storage import initialize_storage from ..utils.application import StorageManager from ..utils.caching import cachedmethod from ..utils.console import (puts, Colored, indent) @@ -211,10 +211,10 @@ class WorkflowCommand(BaseCommand): def __call__(self, args_namespace, unknown_args): super(WorkflowCommand, self).__call__(args_namespace, unknown_args) - deployment_id = args_namespace.deployment_id or 1 + service_instance_id = args_namespace.service_instance_id or 1 context = self._parse(args_namespace.uri) workflow_fn, inputs = self._get_workflow(context, args_namespace.workflow) - self._run(context, args_namespace.workflow, workflow_fn, inputs, deployment_id) + self._run(context, args_namespace.workflow, workflow_fn, inputs, service_instance_id) def _parse(self, uri): # Parse @@ -259,13 +259,14 @@ class WorkflowCommand(BaseCommand): return workflow_fn, inputs - def _run(self, context, workflow_name, workflow_fn, inputs, deployment_id): + def _run(self, context, workflow_name, workflow_fn, inputs, service_instance_id): # Storage def _initialize_storage(model_storage): - initialize_storage(context, model_storage, deployment_id) + initialize_storage(context, model_storage, service_instance_id) # Create runner - runner = Runner(workflow_name, workflow_fn, inputs, _initialize_storage, deployment_id) + runner = Runner(workflow_name, workflow_fn, inputs, _initialize_storage, + service_instance_id) # Run runner.run() @@ -366,7 +367,7 @@ class ExecuteCommand(BaseCommand): FileSystemResourceDriver(local_resource_storage())) model_storage = application_model_storage( FileSystemModelDriver(local_model_storage())) - deployment = model_storage.deployment.get(args_namespace.deployment_id) + deployment = model_storage.service_instance.get(args_namespace.deployment_id) try: workflow = deployment.workflows[args_namespace.workflow_id] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 6ab27ef..37482cf 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -31,7 +31,7 @@ class BaseContext(logger.LoggerMixin): def __init__( self, name, - deployment_id, + service_instance_id, model_storage, resource_storage, workdir=None, @@ -41,13 +41,13 @@ class BaseContext(logger.LoggerMixin): self._id = str(uuid4()) self._model = model_storage self._resource = resource_storage - self._deployment_id = deployment_id + self._service_instance_id = service_instance_id self._workdir = workdir def __repr__(self): return ( '{name}(name={self.name}, ' - 'deployment_id={self._deployment_id}, ' + 'deployment_id={self._service_instance_id}, ' .format(name=self.__class__.__name__, self=self)) @property @@ -67,18 +67,18 @@ class BaseContext(logger.LoggerMixin): return self._resource @property - def blueprint(self): + def service_template(self): """ The blueprint model """ - return self.deployment.blueprint + return self.service_instance.service_template @property - def deployment(self): + def service_instance(self): """ The deployment model """ - return self.model.deployment.get(self._deployment_id) + return self.model.service_instance.get(self._service_instance_id) @property def name(self): @@ -101,11 +101,11 @@ class BaseContext(logger.LoggerMixin): Download a blueprint resource from the resource storage """ try: - self.resource.deployment.download(entry_id=str(self.deployment.id), + self.resource.deployment.download(entry_id=str(self.service_instance.id), destination=destination, path=path) except exceptions.StorageError: - self.resource.blueprint.download(entry_id=str(self.blueprint.id), + self.resource.blueprint.download(entry_id=str(self.service_template.id), destination=destination, path=path) @@ -126,9 +126,9 @@ class BaseContext(logger.LoggerMixin): Read a deployment resource as string from the resource storage """ try: - return self.resource.deployment.read(entry_id=str(self.deployment.id), path=path) + return self.resource.deployment.read(entry_id=str(self.service_instance.id), path=path) except exceptions.StorageError: - return self.resource.blueprint.read(entry_id=str(self.blueprint.id), path=path) + return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path) def get_resource_and_render(self, path=None, variables=None): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index d1f61b2..c5ac8f0 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -31,7 +31,7 @@ class BaseOperationContext(BaseContext): name, model_storage, resource_storage, - deployment_id, + service_instance_id, task_id, actor_id, **kwargs): @@ -39,7 +39,7 @@ class BaseOperationContext(BaseContext): name=name, model_storage=model_storage, resource_storage=resource_storage, - deployment_id=deployment_id, + service_instance_id=service_instance_id, **kwargs) self._task_id = task_id self._actor_id = actor_id @@ -69,7 +69,7 @@ class BaseOperationContext(BaseContext): if not self.task.plugin_name: return None plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir, - self.deployment.id, + self.service_instance.id, self.task.plugin_name) file.makedirs(plugin_workdir) return plugin_workdir @@ -79,7 +79,7 @@ class BaseOperationContext(BaseContext): context_cls = self.__class__ context_dict = { 'name': self.name, - 'deployment_id': self._deployment_id, + 'service_instance_id': self._service_instance_id, 'task_id': self._task_id, 'actor_id': self._actor_id, 'workdir': self._workdir, @@ -106,20 +106,20 @@ class NodeOperationContext(BaseOperationContext): Context for node based operations. """ @property - def node(self): + def node_template(self): """ the node of the current operation :return: """ - return self.node_instance.node + return self.node.node_template @property - def node_instance(self): + def node(self): """ The node instance of the current operation :return: """ - return self.model.node_instance.get(self._actor_id) + return self.model.node.get(self._actor_id) class RelationshipOperationContext(BaseOperationContext): @@ -127,50 +127,41 @@ class RelationshipOperationContext(BaseOperationContext): Context for relationship based operations. """ @property - def source_node(self): + def source_node_template(self): """ The source node :return: """ - return self.relationship.source_node + return self.source_node.node_template @property - def source_node_instance(self): + def source_node(self): """ The source node instance :return: """ - return self.relationship_instance.source_node_instance + return self.relationship.source_node @property - def target_node(self): + def target_node_template(self): """ The target node :return: """ - return self.relationship.target_node + return self.target_node.node_template @property - def target_node_instance(self): + def target_node(self): """ The target node instance :return: """ - return self.relationship_instance.target_node_instance + return self.relationship.target_node @property def relationship(self): """ - The relationship of the current operation - :return: - """ - - return self.relationship_instance.relationship - - @property - def relationship_instance(self): - """ The relationship instance of the current operation :return: """ - return self.model.relationship_instance.get(self._actor_id) + return self.model.relationship.get(self._actor_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py index 301b013..def7d42 100644 --- a/aria/orchestrator/context/toolbelt.py +++ b/aria/orchestrator/context/toolbelt.py @@ -33,7 +33,7 @@ class NodeToolBelt(object): :return: """ assert isinstance(self._op_context, operation.NodeOperationContext) - host = self._op_context.node_instance.host + host = self._op_context.node.host return host.runtime_properties.get('ip') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index a15790e..00ed974 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -49,18 +49,16 @@ class WorkflowContext(BaseContext): def __repr__(self): return ( - '{name}(deployment_id={self._deployment_id}, ' + '{name}(deployment_id={self._service_instance_id}, ' 'workflow_name={self._workflow_name}'.format( name=self.__class__.__name__, self=self)) def _create_execution(self): - execution_cls = self.model.execution.model_cls now = datetime.utcnow() execution = self.model.execution.model_cls( - deployment=self.deployment, + service_instance=self.service_instance, workflow_name=self._workflow_name, created_at=now, - status=execution_cls.PENDING, parameters=self.parameters, ) self.model.execution.put(execution) @@ -81,27 +79,27 @@ class WorkflowContext(BaseContext): self.model.execution.put(value) @property - def nodes(self): + def node_templates(self): """ Iterator over nodes """ - key = 'deployment_{0}'.format(self.model.node.model_cls.name_column_name()) + key = 'service_instance_{0}'.format(self.model.node_template.model_cls.name_column_name()) - return self.model.node.iter( + return self.model.node_template.iter( filters={ - key: getattr(self.deployment, self.deployment.name_column_name()) + key: getattr(self.service_instance, self.service_instance.name_column_name()) } ) @property - def node_instances(self): + def nodes(self): """ Iterator over node instances """ - key = 'deployment_{0}'.format(self.model.node_instance.model_cls.name_column_name()) - return self.model.node_instance.iter( + key = 'service_instance_{0}'.format(self.model.node.model_cls.name_column_name()) + return self.model.node.iter( filters={ - key: getattr(self.deployment, self.deployment.name_column_name()) + key: getattr(self.service_instance, self.service_instance.name_column_name()) } ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py index 5950dc5..8927de9 100644 --- a/aria/orchestrator/runner.py +++ b/aria/orchestrator/runner.py @@ -50,7 +50,7 @@ class Runner(object): """ def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn, - deployment_id, storage_path='', is_storage_temporary=True): + service_instance_id, storage_path='', is_storage_temporary=True): if storage_path == '': # Temporary file storage the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-') @@ -61,7 +61,7 @@ class Runner(object): self._storage_name = os.path.basename(storage_path) self._is_storage_temporary = is_storage_temporary - workflow_context = self.create_workflow_context(workflow_name, deployment_id, + workflow_context = self.create_workflow_context(workflow_name, service_instance_id, initialize_model_storage_fn) tasks_graph = workflow_fn(ctx=workflow_context, **inputs) @@ -77,7 +77,10 @@ class Runner(object): finally: self.cleanup() - def create_workflow_context(self, workflow_name, deployment_id, initialize_model_storage_fn): + def create_workflow_context(self, + workflow_name, + service_instance_id, + initialize_model_storage_fn): self.cleanup() model_storage = application_model_storage( sql_mapi.SQLAlchemyModelAPI, @@ -89,7 +92,7 @@ class Runner(object): name=workflow_name, model_storage=model_storage, resource_storage=resource_storage, - deployment_id=deployment_id, + service_instance_id=service_instance_id, workflow_name=self.__class__.__name__, task_max_attempts=1, task_retry_interval=1) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 44715c1..6a00844 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph """ from uuid import uuid4 -from aria.storage import model +from aria.storage.modeling import model from ... import context from .. import exceptions @@ -57,13 +57,13 @@ class OperationTask(BaseTask): Represents an operation task in the task_graph """ - SOURCE_OPERATION = 'source_operations' - TARGET_OPERATION = 'target_operations' + SOURCE_OPERATION = 'source' + TARGET_OPERATION = 'target' def __init__(self, name, actor, - operation_mapping, + implementation, max_attempts=None, retry_interval=None, ignore_failure=None, @@ -76,12 +76,12 @@ class OperationTask(BaseTask): :param actor: the operation host on which this operation is registered. :param inputs: operation inputs. """ - assert isinstance(actor, (model.NodeInstance, - model.RelationshipInstance)) + assert isinstance(actor, (model.Node, + model.Relationship)) super(OperationTask, self).__init__() self.actor = actor self.name = '{name}.{actor.id}'.format(name=name, actor=actor) - self.operation_mapping = operation_mapping + self.implementation = implementation self.inputs = inputs or {} self.plugin = plugin or {} self.max_attempts = (self.workflow_context._task_max_attempts @@ -93,70 +93,133 @@ class OperationTask(BaseTask): self.runs_on = runs_on @classmethod - def node_instance(cls, instance, name, inputs=None, *args, **kwargs): + def _merge_inputs(cls, operation_inputs, additional_inputs=None): + final_inputs = dict((p.name, p.as_raw['value']) for p in operation_inputs) + final_inputs.update(additional_inputs or {}) + return final_inputs + + @classmethod + def node(cls, instance, name, inputs=None, *args, **kwargs): """ Represents a node based operation :param instance: the node of which this operation belongs to. :param name: the name of the operation. """ - assert isinstance(instance, model.NodeInstance) - return cls._instance(instance=instance, - name=name, - operation_details=instance.node.operations[name], - inputs=inputs, - plugins=instance.node.plugins or [], - runs_on=model.Task.RUNS_ON_NODE_INSTANCE, - *args, - **kwargs) + assert isinstance(instance, model.Node) + interface_name = _get_interface_name(name) + interfaces = instance.interfaces.filter_by(name=interface_name) + if interfaces.count() > 1: + raise exceptions.TaskException( + "More than one interface with the same name `{0}` found".format(name) + ) + elif interfaces.count() == 0: + raise exceptions.TaskException( + "No Interface with the name `{interface_name}` found".format( + interface_name=interface_name) + ) + + operation_templates = interfaces[0].operations.filter_by(name=name) + if operation_templates.count() > 1: + raise exceptions.TaskException( + "More than one operation with the same name `{0}` were found".format(name) + ) + + elif operation_templates.count() == 0: + raise exceptions.TaskException( + "No interface with the name `{operation_name}` found".format( + operation_name=name) + ) + + return cls._instance( + instance=instance, + name=name, + operation_template=operation_templates[0], + plugins=instance.plugins or [], + runs_on=model.Task.RUNS_ON_NODE_INSTANCE, + inputs=cls._merge_inputs(operation_templates[0].inputs, inputs), + *args, + **kwargs) @classmethod - def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs): + def relationship(cls, instance, name, edge, runs_on=None, inputs=None, *args, + **kwargs): """ Represents a relationship based operation :param instance: the relationship of which this operation belongs to. :param name: the name of the operation. - :param operation_end: source or target end of the relationship, this corresponds directly - with 'source_operations' and 'target_operations' + :param edge: the edge of the interface ("source" or "target"). + :param runs_on: where to run the operation ("source" or "target"); if None defaults to the + interface edge. :param inputs any additional inputs to the operation """ - assert isinstance(instance, model.RelationshipInstance) - if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: - raise exceptions.TaskException('The operation end should be {0} or {1}'.format( - cls.TARGET_OPERATION, cls.SOURCE_OPERATION - )) - operation_details = getattr(instance.relationship, operation_end)[name] - if operation_end == cls.SOURCE_OPERATION: - plugins = instance.relationship.source_node.plugins - runs_on = model.Task.RUNS_ON_SOURCE + assert isinstance(instance, model.Relationship) + interface_name = _get_interface_name(name) + interfaces = instance.interfaces.filter_by(name=interface_name, edge=edge) + count = interfaces.count() + if count > 1: + raise exceptions.TaskException( + "More than one interface with the same name `{interface_name}` found at `{edge}`" + + " edge".format( + interface_name=interface_name, edge=edge) + ) + elif count == 0: + raise exceptions.TaskException( + "No interface with the name `{interface_name}` found at `{edge}` edge".format( + interface_name=interface_name, edge=edge) + ) + + operations = interfaces.all()[0].operations.filter_by(name=name) + count = operations.count() + if count > 1: + raise exceptions.TaskException( + "More than one operation with the same name `{0}` found".format(name) + ) + elif count == 0: + raise exceptions.TaskException( + "No operation with the name `{operation_name}` found".format( + operation_name=name) + ) + + if not runs_on: + if edge == cls.SOURCE_OPERATION: + runs_on = model.Task.RUNS_ON_SOURCE + else: + runs_on = model.Task.RUNS_ON_TARGET + + if runs_on == model.Task.RUNS_ON_SOURCE: + plugins = instance.source_node.plugins else: - plugins = instance.relationship.target_node.plugins - runs_on = model.Task.RUNS_ON_TARGET + plugins = instance.target_node.plugins + return cls._instance(instance=instance, name=name, - operation_details=operation_details, - inputs=inputs, + operation_template=operations[0], plugins=plugins or [], runs_on=runs_on, + inputs=cls._merge_inputs(operations[0].inputs, inputs), *args, **kwargs) @classmethod - def _instance(cls, instance, name, operation_details, inputs, plugins, runs_on, *args, + def _instance(cls, + instance, + name, + operation_template, + inputs, + plugins, + runs_on, + *args, **kwargs): - operation_mapping = operation_details.get('operation') - operation_inputs = operation_details.get('inputs', {}) - operation_inputs.update(inputs or {}) - plugin_name = operation_details.get('plugin') - matching_plugins = [p for p in plugins if p['name'] == plugin_name] + matching_plugins = [p for p in plugins if p['name'] == operation_template.plugin] # All matching plugins should have identical package_name/package_version, so it's safe to # take the first found. plugin = matching_plugins[0] if matching_plugins else {} return cls(actor=instance, name=name, - operation_mapping=operation_mapping, - inputs=operation_inputs, + implementation=operation_template.implementation, + inputs=inputs, plugin=plugin, runs_on=runs_on, *args, @@ -197,3 +260,7 @@ class StubTask(BaseTask): Enables creating empty tasks. """ pass + + +def _get_interface_name(operation_name): + return operation_name.rsplit('.', 1)[0] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py index e76993d..5a7f6ce 100644 --- a/aria/orchestrator/workflows/builtin/execute_operation.py +++ b/aria/orchestrator/workflows/builtin/execute_operation.py @@ -30,8 +30,8 @@ def execute_operation( allow_kwargs_override, run_by_dependency_order, type_names, + node_template_ids, node_ids, - node_instance_ids, **kwargs): """ The execute_operation workflow @@ -43,32 +43,32 @@ def execute_operation( :param bool allow_kwargs_override: :param bool run_by_dependency_order: :param type_names: + :param node_template_ids: :param node_ids: - :param node_instance_ids: :param kwargs: :return: """ subgraphs = {} # filtering node instances - filtered_node_instances = list(_filter_node_instances( + filtered_nodes = list(_filter_node_instances( context=ctx, + node_template_ids=node_template_ids, node_ids=node_ids, - node_instance_ids=node_instance_ids, type_names=type_names)) if run_by_dependency_order: filtered_node_instances_ids = set(node_instance.id - for node_instance in filtered_node_instances) - for node_instance in ctx.node_instances: - if node_instance.id not in filtered_node_instances_ids: - subgraphs[node_instance.id] = ctx.task_graph( - name='execute_operation_stub_{0}'.format(node_instance.id)) + for node_instance in filtered_nodes) + for node in ctx.node_instances: + if node.id not in filtered_node_instances_ids: + subgraphs[node.id] = ctx.task_graph( + name='execute_operation_stub_{0}'.format(node.id)) # registering actual tasks to sequences - for node_instance in filtered_node_instances: + for node in filtered_nodes: graph.add_tasks( _create_node_instance_task( - node_instance=node_instance, + nodes=node, operation=operation, operation_kwargs=operation_kwargs, allow_kwargs_override=allow_kwargs_override @@ -80,37 +80,37 @@ def execute_operation( # adding tasks dependencies if required if run_by_dependency_order: - for node_instance in ctx.node_instances: - for relationship_instance in node_instance.relationship_instances: - graph.add_dependency(source_task=subgraphs[node_instance.id], - after=[subgraphs[relationship_instance.target_id]]) + for node in ctx.nodes: + for relationship in node.relationships: + graph.add_dependency( + source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]]) -def _filter_node_instances(context, node_ids=(), node_instance_ids=(), type_names=()): +def _filter_node_instances(context, node_template_ids=(), node_ids=(), type_names=()): def _is_node_by_id(node_id): - return not node_ids or node_id in node_ids + return not node_template_ids or node_id in node_template_ids def _is_node_instance_by_id(node_instance_id): - return not node_instance_ids or node_instance_id in node_instance_ids + return not node_ids or node_instance_id in node_ids def _is_node_by_type(node_type_hierarchy): return not type_names or node_type_hierarchy in type_names - for node_instance in context.node_instances: - if all((_is_node_by_id(node_instance.node.id), - _is_node_instance_by_id(node_instance.id), - _is_node_by_type(node_instance.node.type_hierarchy))): - yield node_instance + for node in context.nodes: + if all((_is_node_by_id(node.node_template.id), + _is_node_instance_by_id(node.id), + _is_node_by_type(node.node_template.type_hierarchy))): + yield node def _create_node_instance_task( - node_instance, + nodes, operation, operation_kwargs, allow_kwargs_override): """ A workflow which executes a single operation - :param node_instance: the node instance to install + :param nodes: the node instance to install :param basestring operation: the operation name :param dict operation_kwargs: :param bool allow_kwargs_override: @@ -120,7 +120,7 @@ def _create_node_instance_task( if allow_kwargs_override is not None: operation_kwargs['allow_kwargs_override'] = allow_kwargs_override - return OperationTask.node_instance( - instance=node_instance, + return OperationTask.node( + instance=nodes, name=operation, inputs=operation_kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py index 87ac492..2592323 100644 --- a/aria/orchestrator/workflows/builtin/heal.py +++ b/aria/orchestrator/workflows/builtin/heal.py @@ -21,7 +21,7 @@ Builtin heal workflow from aria import workflow -from .workflows import (install_node_instance, uninstall_node_instance) +from .workflows import (install_node, uninstall_node) from ..api import task @@ -35,8 +35,8 @@ def heal(ctx, graph, node_instance_id): :param node_instance_id: the id of the node instance to heal :return: """ - failing_node = ctx.model.node_instance.get(node_instance_id) - host_node = ctx.model.node_instance.get(failing_node.host.id) + failing_node = ctx.model.node.get(node_instance_id) + host_node = ctx.model.node.get(failing_node.host.id) failed_node_instance_subgraph = _get_contained_subgraph(ctx, host_node) failed_node_instance_ids = list(n.id for n in failed_node_instance_subgraph) @@ -78,7 +78,7 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): # create install sub workflow for every node instance for node_instance in failing_node_instances: - node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance, + node_instance_sub_workflow = task.WorkflowTask(uninstall_node, node_instance=node_instance) node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow graph.add_tasks(node_instance_sub_workflow) @@ -98,7 +98,7 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): for relationship_instance in reversed(node_instance.outbound_relationship_instances): target_node_instance = \ - ctx.model.node_instance.get(relationship_instance.target_node_instance.id) + ctx.model.node.get(relationship_instance.target_node_instance.id) target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id] graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow) @@ -131,7 +131,7 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): # create install sub workflow for every node instance for node_instance in failing_node_instances: - node_instance_sub_workflow = task.WorkflowTask(install_node_instance, + node_instance_sub_workflow = task.WorkflowTask(install_node, node_instance=node_instance) node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow graph.add_tasks(node_instance_sub_workflow) @@ -151,7 +151,7 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] for relationship_instance in node_instance.outbound_relationship_instances: - target_node_instance = ctx.model.node_instance.get( + target_node_instance = ctx.model.node.get( relationship_instance.target_node_instance.id) target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id] graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py index 81d1da2..2b9ec66 100644 --- a/aria/orchestrator/workflows/builtin/install.py +++ b/aria/orchestrator/workflows/builtin/install.py @@ -17,18 +17,16 @@ Builtin install workflow """ -from .workflows import install_node_instance -from .utils import create_node_instance_task_dependencies +from .workflows import install_node +from .utils import create_node_task_dependencies from ..api.task import WorkflowTask from ... import workflow @workflow def install(ctx, graph): - tasks_and_node_instances = [] - for node_instance in ctx.model.node_instance.iter(): - tasks_and_node_instances.append(( - WorkflowTask(install_node_instance, node_instance=node_instance), - node_instance)) - graph.add_tasks([task for task, _ in tasks_and_node_instances]) - create_node_instance_task_dependencies(graph, tasks_and_node_instances) + tasks_and_nodes = [] + for node in ctx.nodes: + tasks_and_nodes.append((WorkflowTask(install_node, node=node), node)) + graph.add_tasks([task for task, _ in tasks_and_nodes]) + create_node_task_dependencies(graph, tasks_and_nodes) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/builtin/start.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/start.py b/aria/orchestrator/workflows/builtin/start.py index a044d73..ad67554 100644 --- a/aria/orchestrator/workflows/builtin/start.py +++ b/aria/orchestrator/workflows/builtin/start.py @@ -17,12 +17,12 @@ Builtin start workflow """ -from .workflows import start_node_instance +from .workflows import start_node from ..api.task import WorkflowTask from ... import workflow @workflow def start(ctx, graph): - for node_instance in ctx.model.node_instance.iter(): - graph.add_tasks(WorkflowTask(start_node_instance, node_instance=node_instance)) + for node in ctx.model.node.iter(): + graph.add_tasks(WorkflowTask(start_node, node=node)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/builtin/stop.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/stop.py b/aria/orchestrator/workflows/builtin/stop.py index 584c0d3..23ac366 100644 --- a/aria/orchestrator/workflows/builtin/stop.py +++ b/aria/orchestrator/workflows/builtin/stop.py @@ -17,12 +17,12 @@ Builtin stop workflow """ -from .workflows import stop_node_instance +from .workflows import stop_node from ..api.task import WorkflowTask from ... import workflow @workflow def stop(ctx, graph): - for node_instance in ctx.model.node_instance.iter(): - graph.add_tasks(WorkflowTask(stop_node_instance, node_instance=node_instance)) + for node in ctx.model.node.iter(): + graph.add_tasks(WorkflowTask(stop_node, node=node)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py index bfcc9ec..e4afcd9 100644 --- a/aria/orchestrator/workflows/builtin/uninstall.py +++ b/aria/orchestrator/workflows/builtin/uninstall.py @@ -17,18 +17,18 @@ Builtin uninstall workflow """ -from .workflows import uninstall_node_instance -from .utils import create_node_instance_task_dependencies +from .workflows import uninstall_node +from .utils import create_node_task_dependencies from ..api.task import WorkflowTask from ... import workflow @workflow def uninstall(ctx, graph): - tasks_and_node_instances = [] - for node_instance in ctx.model.node_instance.iter(): - tasks_and_node_instances.append(( - WorkflowTask(uninstall_node_instance, node_instance=node_instance), - node_instance)) - graph.add_tasks([task for task, _ in tasks_and_node_instances]) - create_node_instance_task_dependencies(graph, tasks_and_node_instances, reverse=True) + tasks_and_nodes = [] + for node in ctx.nodes: + tasks_and_nodes.append(( + WorkflowTask(uninstall_node, node=node), + node)) + graph.add_tasks([task for task, _ in tasks_and_nodes]) + create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/builtin/utils.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py index 8b7a8bc..c9dbc6b 100644 --- a/aria/orchestrator/workflows/builtin/utils.py +++ b/aria/orchestrator/workflows/builtin/utils.py @@ -16,50 +16,49 @@ from ..api.task import OperationTask -def create_node_instance_task(operation_name, node_instance): +def create_node_task(operation_name, node): """ - Returns a new operation task if the operation exists in the node instance, otherwise returns - None. + Returns a new operation task if the operation exists in the node, otherwise returns None. """ - if operation_name in node_instance.node.operations: - return OperationTask.node_instance(instance=node_instance, - name=operation_name) + if _has_operation(node.interfaces, operation_name): + return OperationTask.node(instance=node, + name=operation_name) return None -def create_relationship_instance_tasks(operation_name, operations_attr, node_instance): +def create_relationship_tasks(operation_name, runs_on, node): """ - Returns a list of operation tasks for each outbound relationship of the node instance if - the operation exists there. + Returns a list of operation tasks for each outbound relationship of the node if the operation + exists there. """ sequence = [] - for relationship_instance in node_instance.outbound_relationship_instances: - if operation_name in getattr(relationship_instance.relationship, operations_attr): + for relationship in node.outbound_relationships: + if _has_operation(relationship.interfaces, operation_name): sequence.append( - OperationTask.relationship_instance(instance=relationship_instance, - name=operation_name, - operation_end=operations_attr)) + OperationTask.relationship(instance=relationship, + name=operation_name, + edge='source', + runs_on=runs_on)) return sequence -def create_node_instance_task_dependencies(graph, tasks_and_node_instances, reverse=False): +def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): """ - Creates dependencies between tasks if there is an outbound relationship between their node - instances. + Creates dependencies between tasks if there is a relationship (outbound) between their nodes. """ - def get_task(node_instance_id): - for task, node_instance in tasks_and_node_instances: - if node_instance.id == node_instance_id: + def get_task(node_id): + for task, node in tasks_and_nodes: + if node.id == node_id: return task return None - for task, node_instance in tasks_and_node_instances: + for task, node in tasks_and_nodes: dependencies = [] - for relationship_instance in node_instance.outbound_relationship_instances: - dependency = get_task(relationship_instance.target_node_instance.id) + for relationship in node.outbound_relationships: + dependency = get_task(relationship.target_node.id) if dependency: dependencies.append(dependency) if dependencies: @@ -68,3 +67,10 @@ def create_node_instance_task_dependencies(graph, tasks_and_node_instances, reve graph.add_dependency(dependency, task) else: graph.add_dependency(task, dependencies) + + +def _has_operation(interfaces, operation_name): + for interface in interfaces: + if interface.operations.filter_by(name=operation_name).count() == 1: + return True + return False http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py index c2fbded..180b4e9 100644 --- a/aria/orchestrator/workflows/builtin/workflows.py +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -17,122 +17,157 @@ A set of builtin workflows. """ -from .utils import (create_node_instance_task, create_relationship_instance_tasks) +from .utils import (create_node_task, create_relationship_tasks) from ... import workflow +NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard' +NORMATIVE_CONFIGURE_INTERFACE = 'Configure' # 'tosca.interfaces.relationship.Configure' + +NORMATIVE_CREATE = NORMATIVE_STANDARD_INTERFACE + '.create' +NORMATIVE_START = NORMATIVE_STANDARD_INTERFACE + '.start' +NORMATIVE_STOP = NORMATIVE_STANDARD_INTERFACE + '.stop' +NORMATIVE_DELETE = NORMATIVE_STANDARD_INTERFACE + '.delete' + +NORMATIVE_CONFIGURE = NORMATIVE_STANDARD_INTERFACE + '.configure' +NORMATIVE_PRE_CONFIGURE_SOURCE = NORMATIVE_CONFIGURE_INTERFACE + '.pre_configure_source' +NORMATIVE_PRE_CONFIGURE_TARGET = NORMATIVE_CONFIGURE_INTERFACE + '.pre_configure_target' +NORMATIVE_POST_CONFIGURE_SOURCE = NORMATIVE_CONFIGURE_INTERFACE + '.post_configure_source' +NORMATIVE_POST_CONFIGURE_TARGET = NORMATIVE_CONFIGURE_INTERFACE + '.post_configure_target' + +NORMATIVE_ADD_SOURCE = NORMATIVE_CONFIGURE_INTERFACE + '.add_source' +NORMATIVE_ADD_TARGET = NORMATIVE_CONFIGURE_INTERFACE + '.add_target' +NORMATIVE_REMOVE_TARGET = NORMATIVE_CONFIGURE_INTERFACE + '.remove_target' +NORMATIVE_TARGET_CHANGED = NORMATIVE_CONFIGURE_INTERFACE + '.target_changed' + + __all__ = ( - 'install_node_instance', - 'uninstall_node_instance', - 'start_node_instance', - 'stop_node_instance', + 'NORMATIVE_STANDARD_INTERFACE', + 'NORMATIVE_CONFIGURE_INTERFACE', + 'NORMATIVE_CREATE', + 'NORMATIVE_START', + 'NORMATIVE_STOP', + 'NORMATIVE_DELETE', + 'NORMATIVE_CONFIGURE', + 'NORMATIVE_PRE_CONFIGURE_SOURCE', + 'NORMATIVE_PRE_CONFIGURE_TARGET', + 'NORMATIVE_POST_CONFIGURE_SOURCE', + 'NORMATIVE_POST_CONFIGURE_TARGET', + 'NORMATIVE_ADD_SOURCE', + 'NORMATIVE_ADD_TARGET', + 'NORMATIVE_REMOVE_TARGET', + 'NORMATIVE_TARGET_CHANGED', + 'install_node', + 'uninstall_node', + 'start_node', + 'stop_node', ) -@workflow(suffix_template='{node_instance.id}') -def install_node_instance(graph, node_instance, **kwargs): +@workflow(suffix_template='{node.id}') +def install_node(graph, node, **kwargs): sequence = [] # Create sequence.append( - create_node_instance_task( - 'tosca.interfaces.node.lifecycle.Standard.create', - node_instance)) + create_node_task( + NORMATIVE_CREATE, + node)) # Configure sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.pre_configure_source', - 'source_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_PRE_CONFIGURE_SOURCE, + 'source', + node) sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.pre_configure_target', - 'target_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_PRE_CONFIGURE_TARGET, + 'target', + node) sequence.append( - create_node_instance_task( - 'tosca.interfaces.node.lifecycle.Standard.configure', - node_instance)) + create_node_task( + NORMATIVE_CONFIGURE, + node)) sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.post_configure_source', - 'source_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_POST_CONFIGURE_SOURCE, + 'source', + node) sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.post_configure_target', - 'target_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_POST_CONFIGURE_TARGET, + 'target', + node) # Start - sequence += _create_start_tasks(node_instance) + sequence += _create_start_tasks(node) graph.sequence(*sequence) -@workflow(suffix_template='{node_instance.id}') -def uninstall_node_instance(graph, node_instance, **kwargs): +@workflow(suffix_template='{node.id}') +def uninstall_node(graph, node, **kwargs): # Stop - sequence = _create_stop_tasks(node_instance) + sequence = _create_stop_tasks(node) # Delete sequence.append( - create_node_instance_task( - 'tosca.interfaces.node.lifecycle.Standard.delete', - node_instance)) + create_node_task( + NORMATIVE_DELETE, + node)) graph.sequence(*sequence) -@workflow(suffix_template='{node_instance.id}') -def start_node_instance(graph, node_instance, **kwargs): - graph.sequence(*_create_start_tasks(node_instance)) +@workflow(suffix_template='{node.id}') +def start_node(graph, node, **kwargs): + graph.sequence(*_create_start_tasks(node)) -@workflow(suffix_template='{node_instance.id}') -def stop_node_instance(graph, node_instance, **kwargs): - graph.sequence(*_create_stop_tasks(node_instance)) +@workflow(suffix_template='{node.id}') +def stop_node(graph, node, **kwargs): + graph.sequence(*_create_stop_tasks(node)) -def _create_start_tasks(node_instance): +def _create_start_tasks(node): sequence = [] sequence.append( - create_node_instance_task( - 'tosca.interfaces.node.lifecycle.Standard.start', - node_instance)) + create_node_task( + NORMATIVE_START, + node)) sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.add_source', - 'source_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_ADD_SOURCE, + 'source', + node) sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.add_target', - 'target_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_ADD_TARGET, + 'target', + node) sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.target_changed', - 'target_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_TARGET_CHANGED, + 'target', + node) return sequence -def _create_stop_tasks(node_instance): +def _create_stop_tasks(node): sequence = [] sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.remove_target', - 'target_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_REMOVE_TARGET, + 'target', + node) sequence += \ - create_relationship_instance_tasks( - 'tosca.interfaces.relationship.Configure.target_changed', - 'target_operations', - node_instance) + create_relationship_tasks( + NORMATIVE_TARGET_CHANGED, + 'target', + node) sequence.append( - create_node_instance_task( - 'tosca.interfaces.node.lifecycle.Standard.stop', - node_instance)) + create_node_task( + NORMATIVE_STOP, + node)) return sequence http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index fd83614..55b4159 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -23,7 +23,7 @@ from datetime import datetime import networkx from aria import logger -from aria.storage import model +from aria.storage.modeling import model from aria.orchestrator import events from .. import exceptions http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index f65fc0d..d0e1363 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -24,7 +24,7 @@ from functools import ( ) from aria import logger -from aria.storage import model +from aria.storage.modeling import model from aria.orchestrator.context import operation as operation_context from .. import exceptions @@ -109,11 +109,11 @@ class OperationTask(BaseTask): model_storage = api_task._workflow_context.model base_task_model = model_storage.task.model_cls - if isinstance(api_task.actor, model.NodeInstance): - context_class = operation_context.NodeOperationContext + if isinstance(api_task.actor, model.Node): + context_cls = operation_context.NodeOperationContext task_model_cls = base_task_model.as_node_instance - elif isinstance(api_task.actor, model.RelationshipInstance): - context_class = operation_context.RelationshipOperationContext + elif isinstance(api_task.actor, model.Relationship): + context_cls = operation_context.RelationshipOperationContext task_model_cls = base_task_model.as_relationship_instance else: raise RuntimeError('No operation context could be created for {actor.model_cls}' @@ -127,7 +127,7 @@ class OperationTask(BaseTask): # package_name and package_version operation_task = task_model_cls( name=api_task.name, - operation_mapping=api_task.operation_mapping, + implementation=api_task.implementation, instance=api_task.actor, inputs=api_task.inputs, status=base_task_model.PENDING, @@ -141,13 +141,13 @@ class OperationTask(BaseTask): ) self._workflow_context.model.task.put(operation_task) - self._ctx = context_class(name=api_task.name, - model_storage=self._workflow_context.model, - resource_storage=self._workflow_context.resource, - deployment_id=self._workflow_context._deployment_id, - task_id=operation_task.id, - actor_id=api_task.actor.id, - workdir=self._workflow_context._workdir) + self._ctx = context_cls(name=api_task.name, + model_storage=self._workflow_context.model, + resource_storage=self._workflow_context.resource, + service_instance_id=self._workflow_context._service_instance_id, + task_id=operation_task.id, + actor_id=api_task.actor.id, + workdir=self._workflow_context._workdir) self._task_id = operation_task.id self._update_fields = None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index c4b8ba1..560ac43 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -19,8 +19,8 @@ Subprocess based executor # pylint: disable=wrong-import-position -import sys import os +import sys # As part of the process executor implementation, subprocess are started with this module as their # entry point. We thus remove this module's directory from the python path if it happens to be @@ -47,7 +47,7 @@ from aria.utils import imports from aria.utils import exceptions from aria.orchestrator.workflows.executor import base from aria.storage import instrumentation -from aria.storage import type as storage_type +from aria.storage.modeling import type as storage_type _IS_WIN = os.name == 'nt' @@ -190,7 +190,7 @@ class ProcessExecutor(base.BaseExecutor): def _create_arguments_dict(self, task): return { 'task_id': task.id, - 'operation_mapping': task.operation_mapping, + 'implementation': task.implementation, 'operation_inputs': task.inputs, 'port': self._server_port, 'context': task.context.serialization_dict, @@ -281,9 +281,9 @@ def _patch_session(ctx, messenger, instrument): if not ctx.model: return - # We arbitrarily select the ``node_instance`` mapi to extract the session from it. + # We arbitrarily select the ``node`` mapi to extract the session from it. # could have been any other mapi just as well - session = ctx.model.node_instance._session + session = ctx.model.node._session original_refresh = session.refresh def patched_refresh(target): @@ -317,7 +317,7 @@ def _main(): messenger = _Messenger(task_id=task_id, port=port) messenger.started() - operation_mapping = arguments['operation_mapping'] + implementation = arguments['implementation'] operation_inputs = arguments['operation_inputs'] context_dict = arguments['context'] @@ -329,7 +329,7 @@ def _main(): try: ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) - task_func = imports.load_attribute(operation_mapping) + task_func = imports.load_attribute(implementation) aria.install_aria_extensions() for decorate in process_executor.decorate(): task_func = decorate(task_func) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 1a6ad9f..7ae0217 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -57,7 +57,7 @@ class ThreadExecutor(BaseExecutor): task = self._queue.get(timeout=1) self._task_started(task) try: - task_func = imports.load_attribute(task.operation_mapping) + task_func = imports.load_attribute(task.implementation) task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/parser/modeling/__init__.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/__init__.py b/aria/parser/modeling/__init__.py index a1efd9f..cad25ca 100644 --- a/aria/parser/modeling/__init__.py +++ b/aria/parser/modeling/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .exceptions import CannotEvaluateFunctionException from .context import IdType, ModelingContext from .elements import Element, ModelElement, Function, Parameter, Metadata from .instance_elements import (ServiceInstance, Node, Capability, Relationship, Artifact, Group, @@ -24,10 +23,10 @@ from .model_elements import (ServiceModel, NodeTemplate, RequirementTemplate, Ca GroupPolicyTemplate, GroupPolicyTriggerTemplate, MappingTemplate, SubstitutionTemplate, InterfaceTemplate, OperationTemplate) from .types import TypeHierarchy, Type, RelationshipType, PolicyType, PolicyTriggerType -from .storage import initialize_storage +from .exceptions import CannotEvaluateFunctionException + __all__ = ( - 'CannotEvaluateFunctionException', 'IdType', 'ModelingContext', 'Element', @@ -67,4 +66,5 @@ __all__ = ( 'RelationshipType', 'PolicyType', 'PolicyTriggerType', - 'initialize_storage') + 'CannotEvaluateFunctionException', +) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/parser/modeling/storage.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/storage.py b/aria/parser/modeling/storage.py index 46c3a7c..ff1e536 100644 --- a/aria/parser/modeling/storage.py +++ b/aria/parser/modeling/storage.py @@ -13,167 +13,129 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +This solution is temporary, as we plan to combine aria.parser.modeling and aria.storage.modeling +into one package (aria.modeling?). +""" + from datetime import datetime from threading import RLock -from ...storage import model -from ...orchestrator import operation +from ...storage.modeling import model +from ...orchestrator.decorators import operation from ...utils.console import puts, Colored from ...utils.formatting import safe_repr -def initialize_storage(context, model_storage, deployment_id): - blueprint = create_blueprint(context) - model_storage.blueprint.put(blueprint) - - deployment = create_deployment(context, blueprint, deployment_id) - model_storage.deployment.put(deployment) - - # Create nodes and node instances - for node_template in context.modeling.model.node_templates.itervalues(): - node = create_node(context, deployment, node_template) - model_storage.node.put(node) +def initialize_storage(context, model_storage, service_instance_id): + s_service_template = create_service_template(context) + model_storage.service_template.put(s_service_template) - for a_node in context.modeling.instance.find_nodes(node_template.name): - node_instance = create_node_instance(node, a_node) - model_storage.node_instance.put(node_instance) + s_service_instance = create_service_instance(context, s_service_template, service_instance_id) + model_storage.service_instance.put(s_service_instance) - # Create relationships + # Create node templates and nodes for node_template in context.modeling.model.node_templates.itervalues(): - for index, requirement_template in enumerate(node_template.requirement_templates): - # We are currently limited only to requirements for specific node templates! - if requirement_template.target_node_template_name: - source = model_storage.node.get_by_name(node_template.name) - target = model_storage.node.get_by_name( - requirement_template.target_node_template_name) - relationship = create_relationship(context, source, target, - requirement_template.relationship_template) - model_storage.relationship.put(relationship) - - for node in context.modeling.instance.find_nodes(node_template.name): - for relationship_model in node.relationships: - if relationship_model.source_requirement_index == index: - source_instance = \ - model_storage.node_instance.get_by_name(node.id) - target_instance = \ - model_storage.node_instance.get_by_name( - relationship_model.target_node_id) - relationship_instance = \ - create_relationship_instance(relationship, source_instance, - target_instance) - model_storage.relationship_instance.put(relationship_instance) - - -def create_blueprint(context): + s_node_template = create_node_template(s_service_template, node_template) + model_storage.node_template.put(s_node_template) + + for node in context.modeling.instance.find_nodes(node_template.name): + s_node = create_node(s_service_instance, s_node_template, node) + model_storage.node.put(s_node) + create_interfaces(context, model_storage, node.interfaces, + s_node, 'node', None, '_dry_node') + + # Create relationships between nodes + for source_node in context.modeling.instance.nodes.itervalues(): + for relationship in source_node.relationships: + s_source_node = model_storage.node.get_by_name(source_node.id) + s_target_node = model_storage.node.get_by_name(relationship.target_node_id) + s_relationship = create_relationship(s_source_node, s_target_node) + model_storage.relationship.put(s_relationship) + # TOSCA always uses the "source" edge + create_interfaces(context, model_storage, relationship.source_interfaces, + s_relationship, 'relationship', 'source', '_dry_relationship') + + +def create_service_template(context): now = datetime.utcnow() main_file_name = unicode(context.presentation.location) try: name = context.modeling.model.metadata.values.get('template_name') except AttributeError: name = None - return model.Blueprint( - plan={}, + return model.ServiceTemplate( name=name or main_file_name, description=context.modeling.model.description or '', created_at=now, updated_at=now, - main_file_name=main_file_name) + main_file_name=main_file_name, + plan={} + ) -def create_deployment(context, blueprint, deployment_id): +def create_service_instance(context, service_template, service_instance_id): now = datetime.utcnow() - return model.Deployment( - name='%s_%s' % (blueprint.name, deployment_id), - blueprint_fk=blueprint.id, + return model.ServiceInstance( + name='{0}_{1}'.format(service_template.name, service_instance_id), + service_template=service_template, description=context.modeling.instance.description or '', created_at=now, - updated_at=now, - workflows={}, - inputs={}, - groups={}, - permalink='', - policy_triggers={}, - policy_types={}, - outputs={}, - scaling_groups={}) - - -def create_node(context, deployment, node_template): - operations = create_operations(context, node_template.interface_templates, '_dry_node') - return model.Node( - name=node_template.name, - type=node_template.type_name, - type_hierarchy=[], - number_of_instances=node_template.default_instances, - planned_number_of_instances=node_template.default_instances, - deploy_number_of_instances=node_template.default_instances, - properties={}, - operations=operations, - min_number_of_instances=node_template.min_instances, - max_number_of_instances=node_template.max_instances or 100, - deployment_fk=deployment.id) - - -def create_relationship(context, source, target, relationship_template): - if relationship_template: - source_operations = create_operations(context, - relationship_template.source_interface_templates, - '_dry_relationship') - target_operations = create_operations(context, - relationship_template.target_interface_templates, - '_dry_relationship') - else: - source_operations = {} - target_operations = {} - return model.Relationship( - source_node_fk=source.id, - target_node_fk=target.id, - source_interfaces={}, - source_operations=source_operations, - target_interfaces={}, - target_operations=target_operations, - type='rel_type', - type_hierarchy=[], - properties={}) - - -def create_node_instance(node, node_model): - return model.NodeInstance( - name=node_model.id, - runtime_properties={}, - version=None, - node_fk=node.id, - state='', - scaling_groups=[]) + updated_at=now) -def create_relationship_instance(relationship, source_instance, target_instance): - return model.RelationshipInstance( - relationship_fk=relationship.id, - source_node_instance_fk=source_instance.id, - target_node_instance_fk=target_instance.id) +def create_node_template(service_template, node_template): + return model.NodeTemplate( + name=node_template.name, + type_name=node_template.type_name, + default_instances=node_template.default_instances, + min_instances=node_template.min_instances, + max_instances=node_template.max_instances or 100, + service_template=service_template) -def create_operations(context, interfaces, fn_name): - operations = {} - for interface in interfaces.itervalues(): - operations[interface.type_name] = {} - for oper in interface.operation_templates.itervalues(): - name = '%s.%s' % (interface.type_name, oper.name) - operations[name] = { - 'operation': '%s.%s' % (__name__, fn_name), - 'inputs': { - '_plugin': None, - '_implementation': None}} - if oper.implementation: - plugin, implementation = _parse_implementation(context, oper.implementation) - operations[name]['inputs']['_plugin'] = plugin - operations[name]['inputs']['_implementation'] = implementation +def create_node(service_instance, node_template, node): + return model.Node( + name=node.id, + state='', + node_template=node_template, + service_instance=service_instance) - return operations + +def create_relationship(source_node, target_node): + return model.Relationship( + source_node=source_node, + target_node=target_node) + + +def create_interfaces(context, model_storage, interfaces, node_or_relationship, type_name, edge, + fn_name): + for interface_name, interface in interfaces.iteritems(): + s_interface = model.Interface(name=interface_name, + type_name=interface.type_name, + edge=edge) + setattr(s_interface, type_name, node_or_relationship) + model_storage.interface.put(s_interface) + for operation_name, oper in interface.operations.iteritems(): + operation_name = '{0}.{1}'.format(interface_name, operation_name) + s_operation = model.Operation(name=operation_name, + implementation='{0}.{1}'.format(__name__, fn_name), + interface=s_interface) + plugin, implementation = _parse_implementation(context, oper.implementation) + # TODO: operation's user inputs + s_operation.inputs.append(model.Parameter(name='_plugin', # pylint: disable=no-member + str_value=str(plugin), + type='str')) + s_operation.inputs.append(model.Parameter(name='_implementation', # pylint: disable=no-member + str_value=str(implementation), + type='str')) + model_storage.operation.put(s_operation) def _parse_implementation(context, implementation): + if not implementation: + return '', '' + index = implementation.find('>') if index == -1: return 'execution', implementation @@ -204,7 +166,7 @@ _TERMINAL_LOCK = RLock() @operation def _dry_node(ctx, _plugin, _implementation, **kwargs): with _TERMINAL_LOCK: - print '> node instance: %s' % Colored.red(ctx.node_instance.name) + print '> node instance: %s' % Colored.red(ctx.node.name) _dump_implementation(_plugin, _implementation) @@ -212,8 +174,8 @@ def _dry_node(ctx, _plugin, _implementation, **kwargs): def _dry_relationship(ctx, _plugin, _implementation, **kwargs): with _TERMINAL_LOCK: puts('> relationship instance: %s -> %s' % ( - Colored.red(ctx.relationship_instance.source_node_instance.name), - Colored.red(ctx.relationship_instance.target_node_instance.name))) + Colored.red(ctx.relationship.source_node.name), + Colored.red(ctx.relationship.target_node.name))) _dump_implementation(_plugin, _implementation) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/parser/modeling/utils.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/utils.py b/aria/parser/modeling/utils.py index 906106e..21db433 100644 --- a/aria/parser/modeling/utils.py +++ b/aria/parser/modeling/utils.py @@ -57,7 +57,7 @@ def coerce_value(context, container, value, report_issues=False): return [coerce_value(context, container, v, report_issues) for v in value] elif isinstance(value, dict): return OrderedDict((k, coerce_value(context, container, v, report_issues)) - for k, v in value.iteritems()) + for k, v in value.items()) elif hasattr(value, '_evaluate'): try: value = value._evaluate(context, container) @@ -73,7 +73,7 @@ def coerce_value(context, container, value, report_issues=False): def validate_dict_values(context, the_dict): if not the_dict: return - validate_list_values(context, the_dict.itervalues()) + validate_list_values(context, the_dict.values()) def validate_list_values(context, the_list): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index a1c07d7..eaadc7e 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -42,14 +42,18 @@ from .core import ( ModelStorage, ResourceStorage, ) +from .modeling import ( + structure, + model, + model_base, + type +) from . import ( exceptions, api, - structure, core, filesystem_rapi, sql_mapi, - model ) __all__ = ( @@ -60,5 +64,8 @@ __all__ = ( 'ResourceStorage', 'filesystem_rapi', 'sql_mapi', - 'api' + 'api', + 'model', + 'model_base', + 'type', ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b6193359/aria/storage/api.py ---------------------------------------------------------------------- diff --git a/aria/storage/api.py b/aria/storage/api.py index f6da6de..09a4dd9 100644 --- a/aria/storage/api.py +++ b/aria/storage/api.py @@ -44,7 +44,7 @@ class ModelAPI(StorageAPI): """ super(ModelAPI, self).__init__(**kwargs) self._model_cls = model_cls - self._name = name or generate_lower_name(model_cls) + self._name = name or model_cls.__modelname__ @property def name(self): @@ -178,6 +178,4 @@ def generate_lower_name(model_cls): :return: lower name :rtype: basestring """ - return ''.join( - character if character.islower() else '_{0}'.format(character.lower()) - for character in model_cls.__name__)[1:] + return getattr(model_cls, '__mapiname__', model_cls.__tablename__)