ARIA-44-Merge-parser-and-storage-models
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/55556793 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/55556793 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/55556793 Branch: refs/heads/ARIA-44-Merge-parser-and-storage-models Commit: 55556793050088bd09988a8325b956825fa6df9a Parents: 9e62fca Author: mxmrlv <[email protected]> Authored: Thu Jan 19 11:39:36 2017 +0200 Committer: mxmrlv <[email protected]> Committed: Tue Jan 31 14:55:48 2017 +0200 ---------------------------------------------------------------------- aria/__init__.py | 49 +- aria/cli/commands.py | 2 +- aria/orchestrator/context/common.py | 22 +- aria/orchestrator/context/operation.py | 41 +- aria/orchestrator/context/serialization.py | 2 +- aria/orchestrator/context/toolbelt.py | 2 +- aria/orchestrator/context/workflow.py | 16 +- aria/orchestrator/runner.py | 13 +- aria/orchestrator/workflows/api/task.py | 126 +- aria/orchestrator/workflows/core/engine.py | 2 +- aria/orchestrator/workflows/core/task.py | 26 +- aria/orchestrator/workflows/executor/process.py | 14 +- aria/orchestrator/workflows/executor/thread.py | 2 +- aria/parser/modeling/__init__.py | 7 +- aria/parser/modeling/storage.py | 224 --- aria/parser/modeling/utils.py | 4 +- aria/storage/__init__.py | 13 +- aria/storage/api.py | 6 +- aria/storage/base_model.py | 757 ---------- aria/storage/core.py | 3 +- aria/storage/instrumentation.py | 15 +- aria/storage/model.py | 110 -- aria/storage/modeling/__init__.py | 37 + aria/storage/modeling/elements.py | 102 ++ aria/storage/modeling/instance_elements.py | 1257 ++++++++++++++++ aria/storage/modeling/model.py | 175 +++ aria/storage/modeling/orchestrator_elements.py | 461 ++++++ aria/storage/modeling/structure.py | 320 +++++ aria/storage/modeling/template_elements.py | 1348 ++++++++++++++++++ aria/storage/modeling/type.py | 302 ++++ aria/storage/modeling/utils.py | 139 ++ aria/storage/structure.py | 190 --- aria/storage/type.py | 299 ---- aria/storage_initializer.py | 135 ++ aria/utils/application.py | 14 +- .../simple_v1_0/modeling/data_types.py | 4 +- tests/mock/context.py | 4 +- tests/mock/models.py | 119 +- tests/mock/topology.py | 99 +- tests/orchestrator/context/test_operation.py | 119 +- .../context/test_resource_render.py | 2 +- tests/orchestrator/context/test_serialize.py | 17 +- tests/orchestrator/context/test_toolbelt.py | 72 +- tests/orchestrator/context/test_workflow.py | 14 +- .../orchestrator/execution_plugin/test_local.py | 61 +- tests/orchestrator/execution_plugin/test_ssh.py | 19 +- tests/orchestrator/test_runner.py | 9 +- tests/orchestrator/workflows/__init__.py | 2 +- tests/orchestrator/workflows/api/test_task.py | 77 +- .../workflows/builtin/test_execute_operation.py | 56 - .../workflows/builtin/test_install.py | 43 - .../workflows/builtin/test_uninstall.py | 44 - .../orchestrator/workflows/core/test_engine.py | 23 +- tests/orchestrator/workflows/core/test_task.py | 74 +- .../test_task_graph_into_exececution_graph.py | 14 +- .../workflows/executor/test_executor.py | 9 +- .../workflows/executor/test_process_executor.py | 10 +- .../executor/test_process_executor_extension.py | 13 +- .../test_process_executor_tracked_changes.py | 49 +- tests/resources/scripts/test_ssh.sh | 30 +- tests/storage/__init__.py | 15 +- tests/storage/test_instrumentation.py | 18 +- tests/storage/test_model_storage.py | 45 +- tests/storage/test_models.py | 919 ------------ tests/storage/test_structures.py | 8 +- 65 files changed, 4997 insertions(+), 3226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 248aa1a..7f9fe8c 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -61,25 +61,46 @@ def application_model_storage(api, api_kwargs=None): """ Initiate model storage """ - models = [ - storage.model.Plugin, + models_to_register = [ + storage.modeling.model.Parameter, - storage.model.Blueprint, - storage.model.Deployment, - storage.model.DeploymentUpdate, - storage.model.DeploymentUpdateStep, - storage.model.DeploymentModification, + storage.modeling.model.MappingTemplate, + storage.modeling.model.SubstitutionTemplate, + storage.modeling.model.ServiceTemplate, + storage.modeling.model.NodeTemplate, + storage.modeling.model.GroupTemplate, + storage.modeling.model.InterfaceTemplate, + storage.modeling.model.OperationTemplate, + storage.modeling.model.ArtifactTemplate, + storage.modeling.model.PolicyTemplate, + storage.modeling.model.GroupPolicyTemplate, + storage.modeling.model.GroupPolicyTriggerTemplate, + storage.modeling.model.RequirementTemplate, + storage.modeling.model.CapabilityTemplate, - storage.model.Node, - storage.model.NodeInstance, - storage.model.Relationship, - storage.model.RelationshipInstance, + storage.modeling.model.Mapping, + storage.modeling.model.Substitution, + storage.modeling.model.ServiceInstance, + storage.modeling.model.Node, + storage.modeling.model.Group, + storage.modeling.model.Interface, + storage.modeling.model.Operation, + storage.modeling.model.Capability, + storage.modeling.model.Artifact, + storage.modeling.model.Policy, + storage.modeling.model.GroupPolicy, + storage.modeling.model.GroupPolicyTrigger, + storage.modeling.model.Relationship, - storage.model.Execution, - storage.model.Task, + storage.modeling.model.Execution, + storage.modeling.model.ServiceInstanceUpdate, + storage.modeling.model.ServiceInstanceUpdateStep, + storage.modeling.model.ServiceInstanceModification, + storage.modeling.model.Plugin, + storage.modeling.model.Task ] # if api not in _model_storage: - return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {}) + return storage.ModelStorage(api, items=models_to_register, api_kwargs=api_kwargs or {}) def application_resource_storage(api, api_kwargs=None): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index 0890cd1..45c866a 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -366,7 +366,7 @@ class ExecuteCommand(BaseCommand): FileSystemResourceDriver(local_resource_storage())) model_storage = application_model_storage( FileSystemModelDriver(local_model_storage())) - deployment = model_storage.deployment.get(args_namespace.deployment_id) + deployment = model_storage.service_instance.get(args_namespace.deployment_id) try: workflow = deployment.workflows[args_namespace.workflow_id] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 6ab27ef..37482cf 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -31,7 +31,7 @@ class BaseContext(logger.LoggerMixin): def __init__( self, name, - deployment_id, + service_instance_id, model_storage, resource_storage, workdir=None, @@ -41,13 +41,13 @@ class BaseContext(logger.LoggerMixin): self._id = str(uuid4()) self._model = model_storage self._resource = resource_storage - self._deployment_id = deployment_id + self._service_instance_id = service_instance_id self._workdir = workdir def __repr__(self): return ( '{name}(name={self.name}, ' - 'deployment_id={self._deployment_id}, ' + 'deployment_id={self._service_instance_id}, ' .format(name=self.__class__.__name__, self=self)) @property @@ -67,18 +67,18 @@ class BaseContext(logger.LoggerMixin): return self._resource @property - def blueprint(self): + def service_template(self): """ The blueprint model """ - return self.deployment.blueprint + return self.service_instance.service_template @property - def deployment(self): + def service_instance(self): """ The deployment model """ - return self.model.deployment.get(self._deployment_id) + return self.model.service_instance.get(self._service_instance_id) @property def name(self): @@ -101,11 +101,11 @@ class BaseContext(logger.LoggerMixin): Download a blueprint resource from the resource storage """ try: - self.resource.deployment.download(entry_id=str(self.deployment.id), + self.resource.deployment.download(entry_id=str(self.service_instance.id), destination=destination, path=path) except exceptions.StorageError: - self.resource.blueprint.download(entry_id=str(self.blueprint.id), + self.resource.blueprint.download(entry_id=str(self.service_template.id), destination=destination, path=path) @@ -126,9 +126,9 @@ class BaseContext(logger.LoggerMixin): Read a deployment resource as string from the resource storage """ try: - return self.resource.deployment.read(entry_id=str(self.deployment.id), path=path) + return self.resource.deployment.read(entry_id=str(self.service_instance.id), path=path) except exceptions.StorageError: - return self.resource.blueprint.read(entry_id=str(self.blueprint.id), path=path) + return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path) def get_resource_and_render(self, path=None, variables=None): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 23a6fd4..75a6b7f 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -30,7 +30,7 @@ class BaseOperationContext(BaseContext): name, model_storage, resource_storage, - deployment_id, + service_instance_id, task_id, actor_id, **kwargs): @@ -38,7 +38,7 @@ class BaseOperationContext(BaseContext): name=name, model_storage=model_storage, resource_storage=resource_storage, - deployment_id=deployment_id, + service_instance_id=service_instance_id, **kwargs) self._task_id = task_id self._actor_id = actor_id @@ -68,7 +68,7 @@ class BaseOperationContext(BaseContext): if not self.task.plugin_name: return None plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir, - self.deployment.id, + self.service_instance.id, self.task.plugin_name) file.makedirs(plugin_workdir) return plugin_workdir @@ -79,20 +79,20 @@ class NodeOperationContext(BaseOperationContext): Context for node based operations. """ @property - def node(self): + def node_template(self): """ the node of the current operation :return: """ - return self.node_instance.node + return self.node.node_template @property - def node_instance(self): + def node(self): """ The node instance of the current operation :return: """ - return self.model.node_instance.get(self._actor_id) + return self.model.node.get(self._actor_id) class RelationshipOperationContext(BaseOperationContext): @@ -100,50 +100,41 @@ class RelationshipOperationContext(BaseOperationContext): Context for relationship based operations. """ @property - def source_node(self): + def source_node_template(self): """ The source node :return: """ - return self.relationship.source_node + return self.source_node.node_template @property - def source_node_instance(self): + def source_node(self): """ The source node instance :return: """ - return self.relationship_instance.source_node_instance + return self.relationship.source_node @property - def target_node(self): + def target_node_template(self): """ The target node :return: """ - return self.relationship.target_node + return self.target_node.node_template @property - def target_node_instance(self): + def target_node(self): """ The target node instance :return: """ - return self.relationship_instance.target_node_instance + return self.relationship.target_node @property def relationship(self): """ - The relationship of the current operation - :return: - """ - - return self.relationship_instance.relationship - - @property - def relationship_instance(self): - """ The relationship instance of the current operation :return: """ - return self.model.relationship_instance.get(self._actor_id) + return self.model.relationship.get(self._actor_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/context/serialization.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/serialization.py b/aria/orchestrator/context/serialization.py index 760818f..dd88041 100644 --- a/aria/orchestrator/context/serialization.py +++ b/aria/orchestrator/context/serialization.py @@ -23,7 +23,7 @@ def operation_context_to_dict(context): context_cls = context.__class__ context_dict = { 'name': context.name, - 'deployment_id': context._deployment_id, + 'service_instance_id': context._service_instance_id, 'task_id': context._task_id, 'actor_id': context._actor_id, 'workdir': context._workdir http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py index 301b013..def7d42 100644 --- a/aria/orchestrator/context/toolbelt.py +++ b/aria/orchestrator/context/toolbelt.py @@ -33,7 +33,7 @@ class NodeToolBelt(object): :return: """ assert isinstance(self._op_context, operation.NodeOperationContext) - host = self._op_context.node_instance.host + host = self._op_context.node.host return host.runtime_properties.get('ip') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index a15790e..4a8d94f 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -49,18 +49,16 @@ class WorkflowContext(BaseContext): def __repr__(self): return ( - '{name}(deployment_id={self._deployment_id}, ' + '{name}(deployment_id={self._service_instance_id}, ' 'workflow_name={self._workflow_name}'.format( name=self.__class__.__name__, self=self)) def _create_execution(self): - execution_cls = self.model.execution.model_cls now = datetime.utcnow() execution = self.model.execution.model_cls( - deployment=self.deployment, + service_instance=self.service_instance, workflow_name=self._workflow_name, created_at=now, - status=execution_cls.PENDING, parameters=self.parameters, ) self.model.execution.put(execution) @@ -81,27 +79,27 @@ class WorkflowContext(BaseContext): self.model.execution.put(value) @property - def nodes(self): + def node_templates(self): """ Iterator over nodes """ - key = 'deployment_{0}'.format(self.model.node.model_cls.name_column_name()) + key = 'deployment_{0}'.format(self.model.node_template.model_cls.name_column_name()) return self.model.node.iter( filters={ - key: getattr(self.deployment, self.deployment.name_column_name()) + key: getattr(self.service_instance, self.service_instance.node_template()) } ) @property - def node_instances(self): + def nodes(self): """ Iterator over node instances """ key = 'deployment_{0}'.format(self.model.node_instance.model_cls.name_column_name()) return self.model.node_instance.iter( filters={ - key: getattr(self.deployment, self.deployment.name_column_name()) + key: getattr(self.service_instance, self.service_instance.name_column_name()) } ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py index 16acc19..57e0f83 100644 --- a/aria/orchestrator/runner.py +++ b/aria/orchestrator/runner.py @@ -50,7 +50,7 @@ class Runner(object): """ def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn, - deployment_id, storage_path='', is_storage_temporary=True): + service_instance_id, storage_path='', is_storage_temporary=True): if storage_path == '': # Temporary file storage the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-') @@ -59,7 +59,7 @@ class Runner(object): self._storage_path = storage_path self._is_storage_temporary = is_storage_temporary - workflow_context = self.create_workflow_context(workflow_name, deployment_id, + workflow_context = self.create_workflow_context(workflow_name, service_instance_id, initialize_model_storage_fn) tasks_graph = workflow_fn(ctx=workflow_context, **inputs) @@ -75,7 +75,10 @@ class Runner(object): finally: self.cleanup() - def create_workflow_context(self, workflow_name, deployment_id, initialize_model_storage_fn): + def create_workflow_context(self, + workflow_name, + service_instance_id, + initialize_model_storage_fn): model_storage = self.create_sqlite_model_storage() initialize_model_storage_fn(model_storage) resource_storage = self.create_fs_resource_storage() @@ -83,7 +86,7 @@ class Runner(object): name=workflow_name, model_storage=model_storage, resource_storage=resource_storage, - deployment_id=deployment_id, + service_instance_id=service_instance_id, workflow_name=self.__class__.__name__, task_max_attempts=1, task_retry_interval=1) @@ -106,7 +109,7 @@ class Runner(object): 'sqlite:///%s%s' % (path_prefix, self._storage_path)) # Models - model.DeclarativeBase.metadata.create_all(bind=sqlite_engine) # @UndefinedVariable + model.DB.metadata.create_all(bind=sqlite_engine) # @UndefinedVariable # Session sqlite_session_factory = orm.sessionmaker(bind=sqlite_engine) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 44715c1..50fe6a9 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph """ from uuid import uuid4 -from aria.storage import model +from aria.storage.modeling import model from ... import context from .. import exceptions @@ -57,13 +57,13 @@ class OperationTask(BaseTask): Represents an operation task in the task_graph """ - SOURCE_OPERATION = 'source_operations' - TARGET_OPERATION = 'target_operations' + SOURCE_OPERATION = 'source' + TARGET_OPERATION = 'target' def __init__(self, name, actor, - operation_mapping, + implementation, max_attempts=None, retry_interval=None, ignore_failure=None, @@ -76,12 +76,12 @@ class OperationTask(BaseTask): :param actor: the operation host on which this operation is registered. :param inputs: operation inputs. """ - assert isinstance(actor, (model.NodeInstance, - model.RelationshipInstance)) + assert isinstance(actor, (model.Node, + model.Relationship)) super(OperationTask, self).__init__() self.actor = actor self.name = '{name}.{actor.id}'.format(name=name, actor=actor) - self.operation_mapping = operation_mapping + self.implementation = implementation self.inputs = inputs or {} self.plugin = plugin or {} self.max_attempts = (self.workflow_context._task_max_attempts @@ -93,6 +93,12 @@ class OperationTask(BaseTask): self.runs_on = runs_on @classmethod + def _merge_inputs(cls, operation_inputs, additional_inputs=None): + final_inputs = dict((p.name, p.as_raw['value']) for p in operation_inputs) + final_inputs.update(additional_inputs or {}) + return final_inputs + + @classmethod def node_instance(cls, instance, name, inputs=None, *args, **kwargs): """ Represents a node based operation @@ -100,63 +106,105 @@ class OperationTask(BaseTask): :param instance: the node of which this operation belongs to. :param name: the name of the operation. """ - assert isinstance(instance, model.NodeInstance) - return cls._instance(instance=instance, - name=name, - operation_details=instance.node.operations[name], - inputs=inputs, - plugins=instance.node.plugins or [], - runs_on=model.Task.RUNS_ON_NODE_INSTANCE, - *args, - **kwargs) + assert isinstance(instance, model.Node) + interface_name = _get_interface_name(name) + interfaces = instance.interfaces.filter_by(name=interface_name) + if interfaces.count() > 1: + raise exceptions.TaskException( + "More than one interface with the same name {0} were found".format(name)) + elif interfaces.count() == 0: + raise exceptions.TaskException( + "No Interface with the name `{interface_name}` was found".format( + interface_name=interface_name) + ) + + operation_templates = interfaces[0].operations.filter_by(name=name) + if operation_templates.count() > 1: + raise exceptions.TaskException( + "More than one operation with the same name {0} were found".format(name)) + + elif operation_templates.count() == 0: + raise exceptions.TaskException( + "No Interface with the name `{operation_name}` was found".format( + operation_name=name) + ) + + return cls._instance( + instance=instance, + name=name, + operation_template=operation_templates[0], + plugins=instance.plugins or [], + runs_on=model.Task.RUNS_ON_NODE_INSTANCE, + inputs=cls._merge_inputs(operation_templates[0].inputs, inputs), + *args, + **kwargs) @classmethod - def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs): + def relationship_instance(cls, instance, name, inputs=None, *args, **kwargs): """ Represents a relationship based operation :param instance: the relationship of which this operation belongs to. :param name: the name of the operation. - :param operation_end: source or target end of the relationship, this corresponds directly - with 'source_operations' and 'target_operations' :param inputs any additional inputs to the operation """ - assert isinstance(instance, model.RelationshipInstance) - if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: - raise exceptions.TaskException('The operation end should be {0} or {1}'.format( - cls.TARGET_OPERATION, cls.SOURCE_OPERATION - )) - operation_details = getattr(instance.relationship, operation_end)[name] + assert isinstance(instance, model.Relationship) + operation_name, operation_end = name.rsplit('_', 1) + interface_name = _get_interface_name(operation_name) + interfaces = getattr(instance, operation_end + '_interfaces').filter_by(name=interface_name) + if interfaces.count() > 1: + raise exceptions.TaskException( + "More than one interface with the same name {0} found".format(interface_name)) + elif interfaces.count() == 0: + raise exceptions.TaskException( + "No Interface with the name `{interface_name}` was found".format( + interface_name=interface_name) + ) + + operations = interfaces.all()[0].operations.filter_by(name=operation_name) + if operations.count() > 1: + raise exceptions.TaskException( + "More than one operation with the same name {0} found".format(name)) + elif operations.count() == 0: + raise exceptions.TaskException( + "No Operation with the name `{operation_name}` was found".format( + operation_name=operation_name) + ) + if operation_end == cls.SOURCE_OPERATION: - plugins = instance.relationship.source_node.plugins + plugins = instance.source_node.plugins runs_on = model.Task.RUNS_ON_SOURCE + else: - plugins = instance.relationship.target_node.plugins + plugins = instance.target_node.plugins runs_on = model.Task.RUNS_ON_TARGET return cls._instance(instance=instance, name=name, - operation_details=operation_details, - inputs=inputs, + operation_template=operations[0], plugins=plugins or [], runs_on=runs_on, + inputs=cls._merge_inputs(operations[0].inputs, inputs), *args, **kwargs) @classmethod - def _instance(cls, instance, name, operation_details, inputs, plugins, runs_on, *args, + def _instance(cls, + instance, + name, + operation_template, + inputs, + plugins, + runs_on, + *args, **kwargs): - operation_mapping = operation_details.get('operation') - operation_inputs = operation_details.get('inputs', {}) - operation_inputs.update(inputs or {}) - plugin_name = operation_details.get('plugin') - matching_plugins = [p for p in plugins if p['name'] == plugin_name] + matching_plugins = [p for p in plugins if p['name'] == operation_template.plugin] # All matching plugins should have identical package_name/package_version, so it's safe to # take the first found. plugin = matching_plugins[0] if matching_plugins else {} return cls(actor=instance, name=name, - operation_mapping=operation_mapping, - inputs=operation_inputs, + implementation=operation_template.implementation, + inputs=inputs, plugin=plugin, runs_on=runs_on, *args, @@ -197,3 +245,7 @@ class StubTask(BaseTask): Enables creating empty tasks. """ pass + + +def _get_interface_name(operation_name): + return operation_name.rsplit('.', 1)[0] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index fd83614..55b4159 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -23,7 +23,7 @@ from datetime import datetime import networkx from aria import logger -from aria.storage import model +from aria.storage.modeling import model from aria.orchestrator import events from .. import exceptions http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index f65fc0d..d0e1363 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -24,7 +24,7 @@ from functools import ( ) from aria import logger -from aria.storage import model +from aria.storage.modeling import model from aria.orchestrator.context import operation as operation_context from .. import exceptions @@ -109,11 +109,11 @@ class OperationTask(BaseTask): model_storage = api_task._workflow_context.model base_task_model = model_storage.task.model_cls - if isinstance(api_task.actor, model.NodeInstance): - context_class = operation_context.NodeOperationContext + if isinstance(api_task.actor, model.Node): + context_cls = operation_context.NodeOperationContext task_model_cls = base_task_model.as_node_instance - elif isinstance(api_task.actor, model.RelationshipInstance): - context_class = operation_context.RelationshipOperationContext + elif isinstance(api_task.actor, model.Relationship): + context_cls = operation_context.RelationshipOperationContext task_model_cls = base_task_model.as_relationship_instance else: raise RuntimeError('No operation context could be created for {actor.model_cls}' @@ -127,7 +127,7 @@ class OperationTask(BaseTask): # package_name and package_version operation_task = task_model_cls( name=api_task.name, - operation_mapping=api_task.operation_mapping, + implementation=api_task.implementation, instance=api_task.actor, inputs=api_task.inputs, status=base_task_model.PENDING, @@ -141,13 +141,13 @@ class OperationTask(BaseTask): ) self._workflow_context.model.task.put(operation_task) - self._ctx = context_class(name=api_task.name, - model_storage=self._workflow_context.model, - resource_storage=self._workflow_context.resource, - deployment_id=self._workflow_context._deployment_id, - task_id=operation_task.id, - actor_id=api_task.actor.id, - workdir=self._workflow_context._workdir) + self._ctx = context_cls(name=api_task.name, + model_storage=self._workflow_context.model, + resource_storage=self._workflow_context.resource, + service_instance_id=self._workflow_context._service_instance_id, + task_id=operation_task.id, + actor_id=api_task.actor.id, + workdir=self._workflow_context._workdir) self._task_id = operation_task.id self._update_fields = None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 7d990fa..96e6b46 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -19,8 +19,8 @@ Subprocess based executor # pylint: disable=wrong-import-position -import sys import os +import sys # As part of the process executor implementation, subprocess are started with this module as their # entry point. We thus remove this module's directory from the python path if it happens to be @@ -47,7 +47,7 @@ from aria.utils import exceptions from aria.orchestrator.workflows.executor import base from aria.orchestrator.context import serialization from aria.storage import instrumentation -from aria.storage import type as storage_type +from aria.storage.modeling import type as storage_type _IS_WIN = os.name == 'nt' @@ -190,7 +190,7 @@ class ProcessExecutor(base.BaseExecutor): def _create_arguments_dict(self, task): return { 'task_id': task.id, - 'operation_mapping': task.operation_mapping, + 'implementation': task.implementation, 'operation_inputs': task.inputs, 'port': self._server_port, 'context': serialization.operation_context_to_dict(task.context), @@ -281,9 +281,9 @@ def _patch_session(ctx, messenger, instrument): if not ctx.model: return - # We arbitrarily select the ``node_instance`` mapi to extract the session from it. + # We arbitrarily select the ``node`` mapi to extract the session from it. # could have been any other mapi just as well - session = ctx.model.node_instance._session + session = ctx.model.node._session original_refresh = session.refresh def patched_refresh(target): @@ -317,7 +317,7 @@ def _main(): messenger = _Messenger(task_id=task_id, port=port) messenger.started() - operation_mapping = arguments['operation_mapping'] + implementation = arguments['implementation'] operation_inputs = arguments['operation_inputs'] context_dict = arguments['context'] @@ -329,7 +329,7 @@ def _main(): try: ctx = serialization.operation_context_from_dict(context_dict) _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) - task_func = imports.load_attribute(operation_mapping) + task_func = imports.load_attribute(implementation) aria.install_aria_extensions() for decorate in process_executor.decorate(): task_func = decorate(task_func) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 1a6ad9f..7ae0217 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -57,7 +57,7 @@ class ThreadExecutor(BaseExecutor): task = self._queue.get(timeout=1) self._task_started(task) try: - task_func = imports.load_attribute(task.operation_mapping) + task_func = imports.load_attribute(task.implementation) task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/parser/modeling/__init__.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/__init__.py b/aria/parser/modeling/__init__.py index a1efd9f..d507a26 100644 --- a/aria/parser/modeling/__init__.py +++ b/aria/parser/modeling/__init__.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .exceptions import CannotEvaluateFunctionException from .context import IdType, ModelingContext from .elements import Element, ModelElement, Function, Parameter, Metadata from .instance_elements import (ServiceInstance, Node, Capability, Relationship, Artifact, Group, @@ -24,10 +23,9 @@ from .model_elements import (ServiceModel, NodeTemplate, RequirementTemplate, Ca GroupPolicyTemplate, GroupPolicyTriggerTemplate, MappingTemplate, SubstitutionTemplate, InterfaceTemplate, OperationTemplate) from .types import TypeHierarchy, Type, RelationshipType, PolicyType, PolicyTriggerType -from .storage import initialize_storage +from .exceptions import CannotEvaluateFunctionException __all__ = ( - 'CannotEvaluateFunctionException', 'IdType', 'ModelingContext', 'Element', @@ -67,4 +65,5 @@ __all__ = ( 'RelationshipType', 'PolicyType', 'PolicyTriggerType', - 'initialize_storage') + 'CannotEvaluateFunctionException' +) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/parser/modeling/storage.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/storage.py b/aria/parser/modeling/storage.py deleted file mode 100644 index 46c3a7c..0000000 --- a/aria/parser/modeling/storage.py +++ /dev/null @@ -1,224 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime -from threading import RLock - -from ...storage import model -from ...orchestrator import operation -from ...utils.console import puts, Colored -from ...utils.formatting import safe_repr - - -def initialize_storage(context, model_storage, deployment_id): - blueprint = create_blueprint(context) - model_storage.blueprint.put(blueprint) - - deployment = create_deployment(context, blueprint, deployment_id) - model_storage.deployment.put(deployment) - - # Create nodes and node instances - for node_template in context.modeling.model.node_templates.itervalues(): - node = create_node(context, deployment, node_template) - model_storage.node.put(node) - - for a_node in context.modeling.instance.find_nodes(node_template.name): - node_instance = create_node_instance(node, a_node) - model_storage.node_instance.put(node_instance) - - # Create relationships - for node_template in context.modeling.model.node_templates.itervalues(): - for index, requirement_template in enumerate(node_template.requirement_templates): - # We are currently limited only to requirements for specific node templates! - if requirement_template.target_node_template_name: - source = model_storage.node.get_by_name(node_template.name) - target = model_storage.node.get_by_name( - requirement_template.target_node_template_name) - relationship = create_relationship(context, source, target, - requirement_template.relationship_template) - model_storage.relationship.put(relationship) - - for node in context.modeling.instance.find_nodes(node_template.name): - for relationship_model in node.relationships: - if relationship_model.source_requirement_index == index: - source_instance = \ - model_storage.node_instance.get_by_name(node.id) - target_instance = \ - model_storage.node_instance.get_by_name( - relationship_model.target_node_id) - relationship_instance = \ - create_relationship_instance(relationship, source_instance, - target_instance) - model_storage.relationship_instance.put(relationship_instance) - - -def create_blueprint(context): - now = datetime.utcnow() - main_file_name = unicode(context.presentation.location) - try: - name = context.modeling.model.metadata.values.get('template_name') - except AttributeError: - name = None - return model.Blueprint( - plan={}, - name=name or main_file_name, - description=context.modeling.model.description or '', - created_at=now, - updated_at=now, - main_file_name=main_file_name) - - -def create_deployment(context, blueprint, deployment_id): - now = datetime.utcnow() - return model.Deployment( - name='%s_%s' % (blueprint.name, deployment_id), - blueprint_fk=blueprint.id, - description=context.modeling.instance.description or '', - created_at=now, - updated_at=now, - workflows={}, - inputs={}, - groups={}, - permalink='', - policy_triggers={}, - policy_types={}, - outputs={}, - scaling_groups={}) - - -def create_node(context, deployment, node_template): - operations = create_operations(context, node_template.interface_templates, '_dry_node') - return model.Node( - name=node_template.name, - type=node_template.type_name, - type_hierarchy=[], - number_of_instances=node_template.default_instances, - planned_number_of_instances=node_template.default_instances, - deploy_number_of_instances=node_template.default_instances, - properties={}, - operations=operations, - min_number_of_instances=node_template.min_instances, - max_number_of_instances=node_template.max_instances or 100, - deployment_fk=deployment.id) - - -def create_relationship(context, source, target, relationship_template): - if relationship_template: - source_operations = create_operations(context, - relationship_template.source_interface_templates, - '_dry_relationship') - target_operations = create_operations(context, - relationship_template.target_interface_templates, - '_dry_relationship') - else: - source_operations = {} - target_operations = {} - return model.Relationship( - source_node_fk=source.id, - target_node_fk=target.id, - source_interfaces={}, - source_operations=source_operations, - target_interfaces={}, - target_operations=target_operations, - type='rel_type', - type_hierarchy=[], - properties={}) - - -def create_node_instance(node, node_model): - return model.NodeInstance( - name=node_model.id, - runtime_properties={}, - version=None, - node_fk=node.id, - state='', - scaling_groups=[]) - - -def create_relationship_instance(relationship, source_instance, target_instance): - return model.RelationshipInstance( - relationship_fk=relationship.id, - source_node_instance_fk=source_instance.id, - target_node_instance_fk=target_instance.id) - - -def create_operations(context, interfaces, fn_name): - operations = {} - for interface in interfaces.itervalues(): - operations[interface.type_name] = {} - for oper in interface.operation_templates.itervalues(): - name = '%s.%s' % (interface.type_name, oper.name) - operations[name] = { - 'operation': '%s.%s' % (__name__, fn_name), - 'inputs': { - '_plugin': None, - '_implementation': None}} - if oper.implementation: - plugin, implementation = _parse_implementation(context, oper.implementation) - operations[name]['inputs']['_plugin'] = plugin - operations[name]['inputs']['_implementation'] = implementation - - return operations - - -def _parse_implementation(context, implementation): - index = implementation.find('>') - if index == -1: - return 'execution', implementation - plugin = implementation[:index].strip() - - # TODO: validation should happen in parser - if (plugin != 'execution') and (_get_plugin(context, plugin) is None): - raise ValueError('unknown plugin: "%s"' % plugin) - - implementation = implementation[index+1:].strip() - return plugin, implementation - - -def _get_plugin(context, plugin_name): - def is_plugin(type_name): - return context.modeling.policy_types.get_role(type_name) == 'plugin' - - for policy in context.modeling.instance.policies.itervalues(): - if (policy.name == plugin_name) and is_plugin(policy.type_name): - return policy - - return None - - -_TERMINAL_LOCK = RLock() - - -@operation -def _dry_node(ctx, _plugin, _implementation, **kwargs): - with _TERMINAL_LOCK: - print '> node instance: %s' % Colored.red(ctx.node_instance.name) - _dump_implementation(_plugin, _implementation) - - -@operation -def _dry_relationship(ctx, _plugin, _implementation, **kwargs): - with _TERMINAL_LOCK: - puts('> relationship instance: %s -> %s' % ( - Colored.red(ctx.relationship_instance.source_node_instance.name), - Colored.red(ctx.relationship_instance.target_node_instance.name))) - _dump_implementation(_plugin, _implementation) - - -def _dump_implementation(plugin, implementation): - if plugin: - print ' plugin: %s' % Colored.magenta(plugin) - if implementation: - print ' implementation: %s' % Colored.yellow(safe_repr(implementation)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/parser/modeling/utils.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/utils.py b/aria/parser/modeling/utils.py index 906106e..21db433 100644 --- a/aria/parser/modeling/utils.py +++ b/aria/parser/modeling/utils.py @@ -57,7 +57,7 @@ def coerce_value(context, container, value, report_issues=False): return [coerce_value(context, container, v, report_issues) for v in value] elif isinstance(value, dict): return OrderedDict((k, coerce_value(context, container, v, report_issues)) - for k, v in value.iteritems()) + for k, v in value.items()) elif hasattr(value, '_evaluate'): try: value = value._evaluate(context, container) @@ -73,7 +73,7 @@ def coerce_value(context, container, value, report_issues=False): def validate_dict_values(context, the_dict): if not the_dict: return - validate_list_values(context, the_dict.itervalues()) + validate_list_values(context, the_dict.values()) def validate_list_values(context, the_list): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index a1c07d7..eaadc7e 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -42,14 +42,18 @@ from .core import ( ModelStorage, ResourceStorage, ) +from .modeling import ( + structure, + model, + model_base, + type +) from . import ( exceptions, api, - structure, core, filesystem_rapi, sql_mapi, - model ) __all__ = ( @@ -60,5 +64,8 @@ __all__ = ( 'ResourceStorage', 'filesystem_rapi', 'sql_mapi', - 'api' + 'api', + 'model', + 'model_base', + 'type', ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/storage/api.py ---------------------------------------------------------------------- diff --git a/aria/storage/api.py b/aria/storage/api.py index d6fc3b8..da0a64e 100644 --- a/aria/storage/api.py +++ b/aria/storage/api.py @@ -44,7 +44,7 @@ class ModelAPI(StorageAPI): """ super(ModelAPI, self).__init__(**kwargs) self._model_cls = model_cls - self._name = name or generate_lower_name(model_cls) + self._name = name or model_cls.__modelname__ @property def name(self): @@ -177,6 +177,4 @@ def generate_lower_name(model_cls): :return: lower name :rtype: basestring """ - return ''.join( - character if character.islower() else '_{0}'.format(character.lower()) - for character in model_cls.__name__)[1:] + return getattr(model_cls, '__mapiname__', model_cls.__tablename__) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/storage/base_model.py ---------------------------------------------------------------------- diff --git a/aria/storage/base_model.py b/aria/storage/base_model.py deleted file mode 100644 index f7d0e5b..0000000 --- a/aria/storage/base_model.py +++ /dev/null @@ -1,757 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.models module -Path: aria.storage.models - -models module holds aria's models. - -classes: - * Field - represents a single field. - * IterField - represents an iterable field. - * Model - abstract model implementation. - * Snapshot - snapshots implementation model. - * Deployment - deployment implementation model. - * DeploymentUpdateStep - deployment update step implementation model. - * DeploymentUpdate - deployment update implementation model. - * DeploymentModification - deployment modification implementation model. - * Execution - execution implementation model. - * Node - node implementation model. - * Relationship - relationship implementation model. - * NodeInstance - node instance implementation model. - * RelationshipInstance - relationship instance implementation model. - * Plugin - plugin implementation model. -""" -from collections import namedtuple -from datetime import datetime - -from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy import ( - Column, - Integer, - Text, - DateTime, - Boolean, - Enum, - String, - Float, - orm, -) -from sqlalchemy.ext.orderinglist import ordering_list - -from ..orchestrator.exceptions import TaskAbortException, TaskRetryException -from .structure import ModelMixin -from .type import ( - List, - Dict -) - -__all__ = ( - 'BlueprintBase', - 'DeploymentBase', - 'DeploymentUpdateStepBase', - 'DeploymentUpdateBase', - 'DeploymentModificationBase', - 'ExecutionBase', - 'NodeBase', - 'RelationshipBase', - 'NodeInstanceBase', - 'RelationshipInstanceBase', - 'PluginBase', - 'TaskBase' -) - -#pylint: disable=no-self-argument, abstract-method - - -class BlueprintBase(ModelMixin): - """ - Blueprint model representation. - """ - __tablename__ = 'blueprints' - - created_at = Column(DateTime, nullable=False, index=True) - main_file_name = Column(Text, nullable=False) - plan = Column(Dict, nullable=False) - updated_at = Column(DateTime) - description = Column(Text) - - -class DeploymentBase(ModelMixin): - """ - Deployment model representation. - """ - __tablename__ = 'deployments' - - _private_fields = ['blueprint_fk'] - - created_at = Column(DateTime, nullable=False, index=True) - description = Column(Text) - inputs = Column(Dict) - groups = Column(Dict) - permalink = Column(Text) - policy_triggers = Column(Dict) - policy_types = Column(Dict) - outputs = Column(Dict) - scaling_groups = Column(Dict) - updated_at = Column(DateTime) - workflows = Column(Dict) - - @declared_attr - def blueprint_fk(cls): - return cls.foreign_key(BlueprintBase, nullable=False) - - @declared_attr - def blueprint(cls): - return cls.one_to_many_relationship('blueprint_fk') - - @declared_attr - def blueprint_name(cls): - return association_proxy('blueprint', cls.name_column_name()) - - -class ExecutionBase(ModelMixin): - """ - Execution model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - __tablename__ = 'executions' - _private_fields = ['deployment_fk'] - - TERMINATED = 'terminated' - FAILED = 'failed' - CANCELLED = 'cancelled' - PENDING = 'pending' - STARTED = 'started' - CANCELLING = 'cancelling' - FORCE_CANCELLING = 'force_cancelling' - - STATES = [TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING] - END_STATES = [TERMINATED, FAILED, CANCELLED] - ACTIVE_STATES = [state for state in STATES if state not in END_STATES] - - VALID_TRANSITIONS = { - PENDING: [STARTED, CANCELLED], - STARTED: END_STATES + [CANCELLING], - CANCELLING: END_STATES + [FORCE_CANCELLING] - } - - @orm.validates('status') - def validate_status(self, key, value): - """Validation function that verifies execution status transitions are OK""" - try: - current_status = getattr(self, key) - except AttributeError: - return - valid_transitions = self.VALID_TRANSITIONS.get(current_status, []) - if all([current_status is not None, - current_status != value, - value not in valid_transitions]): - raise ValueError('Cannot change execution status from {current} to {new}'.format( - current=current_status, - new=value)) - return value - - created_at = Column(DateTime, index=True) - started_at = Column(DateTime, nullable=True, index=True) - ended_at = Column(DateTime, nullable=True, index=True) - error = Column(Text, nullable=True) - is_system_workflow = Column(Boolean, nullable=False, default=False) - parameters = Column(Dict) - status = Column(Enum(*STATES, name='execution_status'), default=PENDING) - workflow_name = Column(Text) - - @declared_attr - def blueprint(cls): - return association_proxy('deployment', 'blueprint') - - @declared_attr - def deployment_fk(cls): - return cls.foreign_key(DeploymentBase, nullable=True) - - @declared_attr - def deployment(cls): - return cls.one_to_many_relationship('deployment_fk') - - @declared_attr - def deployment_name(cls): - return association_proxy('deployment', cls.name_column_name()) - - @declared_attr - def blueprint_name(cls): - return association_proxy('deployment', 'blueprint_name') - - def __str__(self): - return '<{0} id=`{1}` (status={2})>'.format( - self.__class__.__name__, - getattr(self, self.name_column_name()), - self.status - ) - - -class DeploymentUpdateBase(ModelMixin): - """ - Deployment update model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - steps = None - - __tablename__ = 'deployment_updates' - - _private_fields = ['execution_fk', 'deployment_fk'] - - created_at = Column(DateTime, nullable=False, index=True) - deployment_plan = Column(Dict, nullable=False) - deployment_update_node_instances = Column(Dict) - deployment_update_deployment = Column(Dict) - deployment_update_nodes = Column(List) - modified_entity_ids = Column(Dict) - state = Column(Text) - - @declared_attr - def execution_fk(cls): - return cls.foreign_key(ExecutionBase, nullable=True) - - @declared_attr - def execution(cls): - return cls.one_to_many_relationship('execution_fk') - - @declared_attr - def execution_name(cls): - return association_proxy('execution', cls.name_column_name()) - - @declared_attr - def deployment_fk(cls): - return cls.foreign_key(DeploymentBase) - - @declared_attr - def deployment(cls): - return cls.one_to_many_relationship('deployment_fk') - - @declared_attr - def deployment_name(cls): - return association_proxy('deployment', cls.name_column_name()) - - def to_dict(self, suppress_error=False, **kwargs): - dep_update_dict = super(DeploymentUpdateBase, self).to_dict(suppress_error) #pylint: disable=no-member - # Taking care of the fact the DeploymentSteps are _BaseModels - dep_update_dict['steps'] = [step.to_dict() for step in self.steps] - return dep_update_dict - - -class DeploymentUpdateStepBase(ModelMixin): - """ - Deployment update step model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - __tablename__ = 'deployment_update_steps' - _private_fields = ['deployment_update_fk'] - - _action_types = namedtuple('ACTION_TYPES', 'ADD, REMOVE, MODIFY') - ACTION_TYPES = _action_types(ADD='add', REMOVE='remove', MODIFY='modify') - _entity_types = namedtuple( - 'ENTITY_TYPES', - 'NODE, RELATIONSHIP, PROPERTY, OPERATION, WORKFLOW, OUTPUT, DESCRIPTION, GROUP, ' - 'POLICY_TYPE, POLICY_TRIGGER, PLUGIN') - ENTITY_TYPES = _entity_types( - NODE='node', - RELATIONSHIP='relationship', - PROPERTY='property', - OPERATION='operation', - WORKFLOW='workflow', - OUTPUT='output', - DESCRIPTION='description', - GROUP='group', - POLICY_TYPE='policy_type', - POLICY_TRIGGER='policy_trigger', - PLUGIN='plugin' - ) - - action = Column(Enum(*ACTION_TYPES, name='action_type'), nullable=False) - entity_id = Column(Text, nullable=False) - entity_type = Column(Enum(*ENTITY_TYPES, name='entity_type'), nullable=False) - - @declared_attr - def deployment_update_fk(cls): - return cls.foreign_key(DeploymentUpdateBase) - - @declared_attr - def deployment_update(cls): - return cls.one_to_many_relationship('deployment_update_fk', backreference='steps') - - @declared_attr - def deployment_update_name(cls): - return association_proxy('deployment_update', cls.name_column_name()) - - def __hash__(self): - return hash((getattr(self, self.id_column_name()), self.entity_id)) - - def __lt__(self, other): - """ - the order is 'remove' < 'modify' < 'add' - :param other: - :return: - """ - if not isinstance(other, self.__class__): - return not self >= other - - if self.action != other.action: - if self.action == 'remove': - return_value = True - elif self.action == 'add': - return_value = False - else: - return_value = other.action == 'add' - return return_value - - if self.action == 'add': - return self.entity_type == 'node' and other.entity_type == 'relationship' - if self.action == 'remove': - return self.entity_type == 'relationship' and other.entity_type == 'node' - return False - - -class DeploymentModificationBase(ModelMixin): - """ - Deployment modification model representation. - """ - __tablename__ = 'deployment_modifications' - _private_fields = ['deployment_fk'] - - STARTED = 'started' - FINISHED = 'finished' - ROLLEDBACK = 'rolledback' - - STATES = [STARTED, FINISHED, ROLLEDBACK] - END_STATES = [FINISHED, ROLLEDBACK] - - context = Column(Dict) - created_at = Column(DateTime, nullable=False, index=True) - ended_at = Column(DateTime, index=True) - modified_nodes = Column(Dict) - node_instances = Column(Dict) - status = Column(Enum(*STATES, name='deployment_modification_status')) - - @declared_attr - def deployment_fk(cls): - return cls.foreign_key(DeploymentBase) - - @declared_attr - def deployment(cls): - return cls.one_to_many_relationship('deployment_fk', backreference='modifications') - - @declared_attr - def deployment_name(cls): - return association_proxy('deployment', cls.name_column_name()) - - -class NodeBase(ModelMixin): - """ - Node model representation. - """ - __tablename__ = 'nodes' - - # See base class for an explanation on these properties - is_id_unique = False - - _private_fields = ['blueprint_fk', 'host_fk'] - - @declared_attr - def host_fk(cls): - return cls.foreign_key(NodeBase, nullable=True) - - @declared_attr - def host(cls): - return cls.relationship_to_self('host_fk') - - @declared_attr - def host_name(cls): - return association_proxy('host', cls.name_column_name()) - - @declared_attr - def deployment_fk(cls): - return cls.foreign_key(DeploymentBase) - - @declared_attr - def deployment(cls): - return cls.one_to_many_relationship('deployment_fk') - - @declared_attr - def deployment_name(cls): - return association_proxy('deployment', cls.name_column_name()) - - @declared_attr - def blueprint_name(cls): - return association_proxy('deployment', 'blueprint_{0}'.format(cls.name_column_name())) - - deploy_number_of_instances = Column(Integer, nullable=False) - max_number_of_instances = Column(Integer, nullable=False) - min_number_of_instances = Column(Integer, nullable=False) - number_of_instances = Column(Integer, nullable=False) - planned_number_of_instances = Column(Integer, nullable=False) - plugins = Column(List) - properties = Column(Dict) - operations = Column(Dict) - type = Column(Text, nullable=False, index=True) - type_hierarchy = Column(List) - - -class RelationshipBase(ModelMixin): - """ - Relationship model representation. - """ - __tablename__ = 'relationships' - - _private_fields = ['source_node_fk', 'target_node_fk', 'source_position', 'target_position'] - - source_position = Column(Integer) - target_position = Column(Integer) - - @declared_attr - def deployment_id(self): - return association_proxy('source_node', 'deployment_id') - - @declared_attr - def source_node_fk(cls): - return cls.foreign_key(NodeBase) - - @declared_attr - def source_node(cls): - return cls.one_to_many_relationship( - 'source_node_fk', - backreference='outbound_relationships', - backref_kwargs=dict( - order_by=cls.source_position, - collection_class=ordering_list('source_position', count_from=0) - ) - ) - - @declared_attr - def source_name(cls): - return association_proxy('source_node', cls.name_column_name()) - - @declared_attr - def target_node_fk(cls): - return cls.foreign_key(NodeBase, nullable=True) - - @declared_attr - def target_node(cls): - return cls.one_to_many_relationship( - 'target_node_fk', - backreference='inbound_relationships', - backref_kwargs=dict( - order_by=cls.target_position, - collection_class=ordering_list('target_position', count_from=0) - ) - ) - - @declared_attr - def target_name(cls): - return association_proxy('target_node', cls.name_column_name()) - - source_interfaces = Column(Dict) - source_operations = Column(Dict, nullable=False) - target_interfaces = Column(Dict) - target_operations = Column(Dict, nullable=False) - type = Column(String, nullable=False) - type_hierarchy = Column(List) - properties = Column(Dict) - - -class NodeInstanceBase(ModelMixin): - """ - Node instance model representation. - """ - __tablename__ = 'node_instances' - _private_fields = ['node_fk', 'host_fk'] - - runtime_properties = Column(Dict) - scaling_groups = Column(List) - state = Column(Text, nullable=False) - version = Column(Integer, default=1) - - @declared_attr - def host_fk(cls): - return cls.foreign_key(NodeInstanceBase, nullable=True) - - @declared_attr - def host(cls): - return cls.relationship_to_self('host_fk') - - @declared_attr - def host_name(cls): - return association_proxy('host', cls.name_column_name()) - - @declared_attr - def deployment(cls): - return association_proxy('node', 'deployment') - - @declared_attr - def deployment_name(cls): - return association_proxy('node', 'deployment_name') - - @declared_attr - def node_fk(cls): - return cls.foreign_key(NodeBase, nullable=True) - - @declared_attr - def node(cls): - return cls.one_to_many_relationship('node_fk') - - @declared_attr - def node_name(cls): - return association_proxy('node', cls.name_column_name()) - - @property - def ip(self): - if not self.host_fk: - return None - host_node_instance = self.host - if 'ip' in host_node_instance.runtime_properties: # pylint: disable=no-member - return host_node_instance.runtime_properties['ip'] # pylint: disable=no-member - host_node = host_node_instance.node # pylint: disable=no-member - if 'ip' in host_node.properties: - return host_node.properties['ip'] - return None - - -class RelationshipInstanceBase(ModelMixin): - """ - Relationship instance model representation. - """ - __tablename__ = 'relationship_instances' - _private_fields = ['relationship_storage_fk', - 'source_node_instance_fk', - 'target_node_instance_fk', - 'source_position', - 'target_position'] - - source_position = Column(Integer) - target_position = Column(Integer) - - @declared_attr - def source_node_instance_fk(cls): - return cls.foreign_key(NodeInstanceBase, nullable=True) - - @declared_attr - def source_node_instance(cls): - return cls.one_to_many_relationship( - 'source_node_instance_fk', - backreference='outbound_relationship_instances', - backref_kwargs=dict( - order_by=cls.source_position, - collection_class=ordering_list('source_position', count_from=0) - ) - ) - - @declared_attr - def source_node_instance_name(cls): - return association_proxy('source_node_instance', 'node_{0}'.format(cls.name_column_name())) - - @declared_attr - def source_node_name(cls): - return association_proxy('source_node_instance', cls.name_column_name()) - - @declared_attr - def target_node_instance_fk(cls): - return cls.foreign_key(NodeInstanceBase, nullable=True) - - @declared_attr - def target_node_instance(cls): - return cls.one_to_many_relationship( - 'target_node_instance_fk', - backreference='inbound_relationship_instances', - backref_kwargs=dict( - order_by=cls.target_position, - collection_class=ordering_list('target_position', count_from=0) - ) - ) - - @declared_attr - def target_node_instance_name(cls): - return association_proxy('target_node_instance', cls.name_column_name()) - - @declared_attr - def target_node_name(cls): - return association_proxy('target_node_instance', 'node_{0}'.format(cls.name_column_name())) - - @declared_attr - def relationship_fk(cls): - return cls.foreign_key(RelationshipBase) - - @declared_attr - def relationship(cls): - return cls.one_to_many_relationship('relationship_fk') - - @declared_attr - def relationship_name(cls): - return association_proxy('relationship', cls.name_column_name()) - - - -class PluginBase(ModelMixin): - """ - Plugin model representation. - """ - __tablename__ = 'plugins' - - archive_name = Column(Text, nullable=False, index=True) - distribution = Column(Text) - distribution_release = Column(Text) - distribution_version = Column(Text) - package_name = Column(Text, nullable=False, index=True) - package_source = Column(Text) - package_version = Column(Text) - supported_platform = Column(Text) - supported_py_versions = Column(List) - uploaded_at = Column(DateTime, nullable=False, index=True) - wheels = Column(List, nullable=False) - - -class TaskBase(ModelMixin): - """ - A Model which represents an task - """ - __tablename__ = 'tasks' - _private_fields = ['node_instance_fk', 'relationship_instance_fk', 'execution_fk'] - - @declared_attr - def node_instance_fk(cls): - return cls.foreign_key(NodeInstanceBase, nullable=True) - - @declared_attr - def node_instance_name(cls): - return association_proxy('node_instance', cls.name_column_name()) - - @declared_attr - def node_instance(cls): - return cls.one_to_many_relationship('node_instance_fk') - - @declared_attr - def relationship_instance_fk(cls): - return cls.foreign_key(RelationshipInstanceBase, nullable=True) - - @declared_attr - def relationship_instance_name(cls): - return association_proxy('relationship_instance', cls.name_column_name()) - - @declared_attr - def relationship_instance(cls): - return cls.one_to_many_relationship('relationship_instance_fk') - - @declared_attr - def plugin_fk(cls): - return cls.foreign_key(PluginBase, nullable=True) - - @declared_attr - def plugin(cls): - return cls.one_to_many_relationship('plugin_fk') - - @declared_attr - def execution_fk(cls): - return cls.foreign_key(ExecutionBase, nullable=True) - - @declared_attr - def execution(cls): - return cls.one_to_many_relationship('execution_fk') - - @declared_attr - def execution_name(cls): - return association_proxy('execution', cls.name_column_name()) - - PENDING = 'pending' - RETRYING = 'retrying' - SENT = 'sent' - STARTED = 'started' - SUCCESS = 'success' - FAILED = 'failed' - STATES = ( - PENDING, - RETRYING, - SENT, - STARTED, - SUCCESS, - FAILED, - ) - - WAIT_STATES = [PENDING, RETRYING] - END_STATES = [SUCCESS, FAILED] - - RUNS_ON_SOURCE = 'source' - RUNS_ON_TARGET = 'target' - RUNS_ON_NODE_INSTANCE = 'node_instance' - RUNS_ON = (RUNS_ON_NODE_INSTANCE, RUNS_ON_SOURCE, RUNS_ON_TARGET) - - @orm.validates('max_attempts') - def validate_max_attempts(self, _, value): # pylint: disable=no-self-use - """Validates that max attempts is either -1 or a positive number""" - if value < 1 and value != TaskBase.INFINITE_RETRIES: - raise ValueError('Max attempts can be either -1 (infinite) or any positive number. ' - 'Got {value}'.format(value=value)) - return value - - INFINITE_RETRIES = -1 - - status = Column(Enum(*STATES, name='status'), default=PENDING) - - due_at = Column(DateTime, default=datetime.utcnow) - started_at = Column(DateTime, default=None) - ended_at = Column(DateTime, default=None) - max_attempts = Column(Integer, default=1) - retry_count = Column(Integer, default=0) - retry_interval = Column(Float, default=0) - ignore_failure = Column(Boolean, default=False) - - # Operation specific fields - operation_mapping = Column(String) - inputs = Column(Dict) - plugin_name = Column(String) - _runs_on = Column(Enum(*RUNS_ON, name='runs_on'), name='runs_on') - - @property - def actor(self): - """ - Return the actor of the task - :return: - """ - return self.node_instance or self.relationship_instance - - @property - def runs_on(self): - if self._runs_on == self.RUNS_ON_NODE_INSTANCE: - return self.node_instance - elif self._runs_on == self.RUNS_ON_SOURCE: - return self.relationship_instance.source_node_instance # pylint: disable=no-member - elif self._runs_on == self.RUNS_ON_TARGET: - return self.relationship_instance.target_node_instance # pylint: disable=no-member - return None - - @classmethod - def as_node_instance(cls, instance, runs_on, **kwargs): - return cls(node_instance=instance, _runs_on=runs_on, **kwargs) - - @classmethod - def as_relationship_instance(cls, instance, runs_on, **kwargs): - return cls(relationship_instance=instance, _runs_on=runs_on, **kwargs) - - @staticmethod - def abort(message=None): - raise TaskAbortException(message) - - @staticmethod - def retry(message=None, retry_interval=None): - raise TaskRetryException(message, retry_interval=retry_interval) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index 94b4fe0..fe79dac 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -39,7 +39,6 @@ API: """ from aria.logger import LoggerMixin -from . import api as storage_api __all__ = ( 'Storage', @@ -105,7 +104,7 @@ class ModelStorage(Storage): :param model_cls: the model to register. :return: """ - model_name = storage_api.generate_lower_name(model_cls) + model_name = model_cls.__modelname__ if model_name in self.registered: self.logger.debug('{name} in already storage {self!r}'.format(name=model_name, self=self)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 537dbb5..57fe9bd 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -17,12 +17,11 @@ import copy import sqlalchemy.event -from . import api -from . import model as _model +from .modeling import model as _model _STUB = object() _INSTRUMENTED = { - _model.NodeInstance.runtime_properties: dict + _model.Node.runtime_properties: dict } @@ -75,7 +74,7 @@ class _Instrumentation(object): def _register_set_attribute_listener(self, instrumented_attribute, attribute_type): def listener(target, value, *_): - mapi_name = self._mapi_name(target.__class__) + mapi_name = target.__modelname__ tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) if value is None: @@ -90,7 +89,7 @@ class _Instrumentation(object): def _register_instance_listeners(self, instrumented_class, instrumented_attributes): def listener(target, *_): - mapi_name = self._mapi_name(instrumented_class) + mapi_name = instrumented_class.__modelname__ tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) for attribute_name, attribute_type in instrumented_attributes.items(): @@ -110,7 +109,7 @@ class _Instrumentation(object): def clear(self, target=None): if target: - mapi_name = self._mapi_name(target.__class__) + mapi_name = target.__modelname__ tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_instances.pop(target.id, None) else: @@ -128,10 +127,6 @@ class _Instrumentation(object): def __exit__(self, exc_type, exc_val, exc_tb): self.restore() - @staticmethod - def _mapi_name(instrumented_class): - return api.generate_lower_name(instrumented_class) - class _Value(object): # You may wonder why is this a full blown class and not a named tuple. The reason is that http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/storage/model.py ---------------------------------------------------------------------- diff --git a/aria/storage/model.py b/aria/storage/model.py deleted file mode 100644 index afca3e4..0000000 --- a/aria/storage/model.py +++ /dev/null @@ -1,110 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.models module -Path: aria.storage.models - -models module holds aria's models. - -classes: - * Field - represents a single field. - * IterField - represents an iterable field. - * Model - abstract model implementation. - * Snapshot - snapshots implementation model. - * Deployment - deployment implementation model. - * DeploymentUpdateStep - deployment update step implementation model. - * DeploymentUpdate - deployment update implementation model. - * DeploymentModification - deployment modification implementation model. - * Execution - execution implementation model. - * Node - node implementation model. - * Relationship - relationship implementation model. - * NodeInstance - node instance implementation model. - * RelationshipInstance - relationship instance implementation model. - * ProviderContext - provider context implementation model. - * Plugin - plugin implementation model. -""" -from sqlalchemy.ext.declarative import declarative_base - -from . import structure -from . import base_model as base - -__all__ = ( - 'Blueprint', - 'Deployment', - 'DeploymentUpdateStep', - 'DeploymentUpdate', - 'DeploymentModification', - 'Execution', - 'Node', - 'Relationship', - 'NodeInstance', - 'RelationshipInstance', - 'Plugin', -) - - -#pylint: disable=abstract-method -# The required abstract method implementation are implemented in the ModelIDMixin, which is used as -# a base to the DeclerativeBase. -DeclarativeBase = declarative_base(cls=structure.ModelIDMixin) - - -class Blueprint(DeclarativeBase, base.BlueprintBase): - pass - - -class Deployment(DeclarativeBase, base.DeploymentBase): - pass - - -class Execution(DeclarativeBase, base.ExecutionBase): - pass - - -class DeploymentUpdate(DeclarativeBase, base.DeploymentUpdateBase): - pass - - -class DeploymentUpdateStep(DeclarativeBase, base.DeploymentUpdateStepBase): - pass - - -class DeploymentModification(DeclarativeBase, base.DeploymentModificationBase): - pass - - -class Node(DeclarativeBase, base.NodeBase): - pass - - -class Relationship(DeclarativeBase, base.RelationshipBase): - pass - - -class NodeInstance(DeclarativeBase, base.NodeInstanceBase): - pass - - -class RelationshipInstance(DeclarativeBase, base.RelationshipInstanceBase): - pass - - -class Plugin(DeclarativeBase, base.PluginBase): - pass - - -class Task(DeclarativeBase, base.TaskBase): - pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55556793/aria/storage/modeling/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/__init__.py b/aria/storage/modeling/__init__.py new file mode 100644 index 0000000..1d5936e --- /dev/null +++ b/aria/storage/modeling/__init__.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple + +from . import ( + model, + instance_elements as _instance_base, + orchestrator_elements as _orchestrator_base, + template_elements as _template_base, +) +from .model import DB as declarative_base + +_ModelBaseCls = namedtuple('ModelBase', 'instance_elements,' + 'orchestrator_elements,' + 'template_elements') +model_base = _ModelBaseCls(instance_elements=_instance_base, + orchestrator_elements=_orchestrator_base, + template_elements=_template_base) + +__all__ = ( + 'model', + 'model_base', + 'declarative_base' +)
