reorded storage
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/3ac35f63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/3ac35f63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/3ac35f63 Branch: refs/heads/ARIA-39-Genericize-storage-models Commit: 3ac35f63d1636b7a8ebfa931de6cb8880c24c055 Parents: 89dfdb6 Author: mxmrlv <[email protected]> Authored: Thu Dec 15 12:16:34 2016 +0200 Committer: mxmrlv <[email protected]> Committed: Thu Dec 15 12:16:34 2016 +0200 ---------------------------------------------------------------------- aria/__init__.py | 31 +- aria/orchestrator/context/workflow.py | 4 +- aria/orchestrator/workflows/api/task.py | 10 +- aria/orchestrator/workflows/core/engine.py | 16 +- aria/orchestrator/workflows/core/task.py | 8 +- aria/storage/__init__.py | 12 +- aria/storage/base_model.py | 671 ++++++++++++++++++ aria/storage/model.py | 108 +++ aria/storage/models.py | 111 --- aria/storage/models_base.py | 701 ------------------- aria/storage/structure.py | 179 +++++ aria/storage/structures.py | 267 ------- aria/storage/type.py | 123 ++++ tests/mock/models.py | 22 +- .../orchestrator/workflows/core/test_engine.py | 14 +- .../workflows/executor/test_executor.py | 6 +- tests/storage/__init__.py | 6 +- tests/storage/test_model_storage.py | 57 +- tests/storage/test_models.py | 32 +- 19 files changed, 1193 insertions(+), 1185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index b000397..04b74c7 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -61,22 +61,21 @@ def application_model_storage(api, api_kwargs=None): Initiate model storage for the supplied storage driver """ models = [ - storage.models.Plugin, - storage.models.ProviderContext, - - storage.models.Blueprint, - storage.models.Deployment, - storage.models.DeploymentUpdate, - storage.models.DeploymentUpdateStep, - storage.models.DeploymentModification, - - storage.models.Node, - storage.models.NodeInstance, - storage.models.Relationship, - storage.models.RelationshipInstance, - - storage.models.Execution, - storage.models.Task, + storage.model.Plugin, + + storage.model.Blueprint, + storage.model.Deployment, + storage.model.DeploymentUpdate, + storage.model.DeploymentUpdateStep, + storage.model.DeploymentModification, + + storage.model.Node, + storage.model.NodeInstance, + storage.model.Relationship, + storage.model.RelationshipInstance, + + storage.model.Execution, + storage.model.Task, ] # if api not in _model_storage: return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index f5ba86b..1365888 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -76,7 +76,7 @@ class WorkflowContext(BaseContext): """ return self.model.node.iter( filters={ - 'deployment_id': self.deployment.id + 'deployment_id': getattr(self.deployment, self.deployment.user_id_column()) } ) @@ -87,7 +87,7 @@ class WorkflowContext(BaseContext): """ return self.model.node_instance.iter( filters={ - 'deployment_id': getattr(self.deployment, self.deployment.user_id_column) + 'deployment_id': getattr(self.deployment, self.deployment.user_id_column()) } ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 1c12407..93c2142 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph """ from uuid import uuid4 -from aria.storage import models +from aria.storage import model from ... import context from .. import exceptions @@ -75,8 +75,8 @@ class OperationTask(BaseTask): :param actor: the operation host on which this operation is registered. :param inputs: operation inputs. """ - assert isinstance(actor, (models.NodeInstance, - models.RelationshipInstance)) + assert isinstance(actor, (model.NodeInstance, + model.RelationshipInstance)) super(OperationTask, self).__init__() self.actor = actor self.name = '{name}.{actor.id}'.format(name=name, actor=actor) @@ -97,7 +97,7 @@ class OperationTask(BaseTask): :param instance: the node of which this operation belongs to. :param name: the name of the operation. """ - assert isinstance(instance, models.NodeInstance) + assert isinstance(instance, model.NodeInstance) operation_details = instance.node.operations[name] operation_inputs = operation_details.get('inputs', {}) operation_inputs.update(inputs or {}) @@ -119,7 +119,7 @@ class OperationTask(BaseTask): with 'source_operations' and 'target_operations' :param inputs any additional inputs to the operation """ - assert isinstance(instance, models.RelationshipInstance) + assert isinstance(instance, model.RelationshipInstance) if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: raise exceptions.TaskException('The operation end should be {0} or {1}'.format( cls.TARGET_OPERATION, cls.SOURCE_OPERATION http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 2f0b3f1..7b57ae4 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -23,7 +23,7 @@ from datetime import datetime import networkx from aria import logger -from aria.storage import models +from aria.storage import model from aria.orchestrator import events from .. import exceptions @@ -80,18 +80,18 @@ class Engine(logger.LoggerMixin): events.on_cancelling_workflow_signal.send(self._workflow_context) def _is_cancel(self): - return self._workflow_context.execution.status in [models.Execution.CANCELLING, - models.Execution.CANCELLED] + return self._workflow_context.execution.status in [model.Execution.CANCELLING, + model.Execution.CANCELLED] def _executable_tasks(self): now = datetime.utcnow() return (task for task in self._tasks_iter() - if task.status in models.Task.WAIT_STATES and + if task.status in model.Task.WAIT_STATES and task.due_at <= now and not self._task_has_dependencies(task)) def _ended_tasks(self): - return (task for task in self._tasks_iter() if task.status in models.Task.END_STATES) + return (task for task in self._tasks_iter() if task.status in model.Task.END_STATES) def _task_has_dependencies(self, task): return len(self._execution_graph.pred.get(task.id, {})) > 0 @@ -103,19 +103,19 @@ class Engine(logger.LoggerMixin): for _, data in self._execution_graph.nodes_iter(data=True): task = data['task'] if isinstance(task, engine_task.OperationTask): - if task.model_task.status not in models.Task.END_STATES: + if task.model_task.status not in model.Task.END_STATES: self._workflow_context.model.task.refresh(task.model_task) yield task def _handle_executable_task(self, task): if isinstance(task, engine_task.StubTask): - task.status = models.Task.SUCCESS + task.status = model.Task.SUCCESS else: events.sent_task_signal.send(task) self._executor.execute(task) def _handle_ended_tasks(self, task): - if task.status == models.Task.FAILED and not task.ignore_failure: + if task.status == model.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: self._execution_graph.remove_node(task.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index d381916..ad4bf0a 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -24,7 +24,7 @@ from functools import ( ) from aria import logger -from aria.storage import models +from aria.storage import model from aria.orchestrator.context import operation as operation_context from .. import exceptions @@ -66,7 +66,7 @@ class StubTask(BaseTask): def __init__(self, *args, **kwargs): super(StubTask, self).__init__(*args, **kwargs) - self.status = models.Task.PENDING + self.status = model.Task.PENDING self.due_at = datetime.utcnow() @@ -108,10 +108,10 @@ class OperationTask(BaseTask): self._workflow_context = api_task._workflow_context base_task_model = api_task._workflow_context.model.task.model_cls - if isinstance(api_task.actor, models.NodeInstance): + if isinstance(api_task.actor, model.NodeInstance): context_class = operation_context.NodeOperationContext task_model_cls = base_task_model.as_node_instance - elif isinstance(api_task.actor, models.RelationshipInstance): + elif isinstance(api_task.actor, model.RelationshipInstance): context_class = operation_context.RelationshipOperationContext task_model_cls = base_task_model.as_relationship_instance else: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index fd69d47..a1c07d7 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -45,19 +45,19 @@ from .core import ( from . import ( exceptions, api, - structures, + structure, core, filesystem_rapi, sql_mapi, - models + model ) __all__ = ( 'exceptions', - 'structures', - # 'Storage', - # 'ModelStorage', - # 'ResourceStorage', + 'structure', + 'Storage', + 'ModelStorage', + 'ResourceStorage', 'filesystem_rapi', 'sql_mapi', 'api' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/storage/base_model.py ---------------------------------------------------------------------- diff --git a/aria/storage/base_model.py b/aria/storage/base_model.py new file mode 100644 index 0000000..77c351d --- /dev/null +++ b/aria/storage/base_model.py @@ -0,0 +1,671 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Aria's storage.models module +Path: aria.storage.models + +models module holds aria's models. + +classes: + * Field - represents a single field. + * IterField - represents an iterable field. + * Model - abstract model implementation. + * Snapshot - snapshots implementation model. + * Deployment - deployment implementation model. + * DeploymentUpdateStep - deployment update step implementation model. + * DeploymentUpdate - deployment update implementation model. + * DeploymentModification - deployment modification implementation model. + * Execution - execution implementation model. + * Node - node implementation model. + * Relationship - relationship implementation model. + * NodeInstance - node instance implementation model. + * RelationshipInstance - relationship instance implementation model. + * Plugin - plugin implementation model. +""" +from collections import namedtuple +from datetime import datetime + +from sqlalchemy.ext.associationproxy import association_proxy +from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy import ( + Column, + Integer, + Text, + DateTime, + Boolean, + Enum, + String, + Float, + orm, +) + +from .structure import ModelBase + +from .type import ( + List, + Dict +) + +__all__ = ( + 'BlueprintBase', + 'DeploymentBase', + 'DeploymentUpdateStepBase', + 'DeploymentUpdateBase', + 'DeploymentModificationBase', + 'ExecutionBase', + 'NodeBase', + 'RelationshipBase', + 'NodeInstanceBase', + 'RelationshipInstanceBase', + 'PluginBase', + 'TaskBase' +) + +#pylint: disable=no-self-argument + + +class BlueprintBase(ModelBase): + """ + Blueprint model representation. + """ + __tablename__ = 'blueprints' + + created_at = Column(DateTime, nullable=False, index=True) + main_file_name = Column(Text, nullable=False) + plan = Column(Dict, nullable=False) + updated_at = Column(DateTime) + description = Column(Text) + + +class DeploymentBase(ModelBase): + """ + Deployment model representation. + """ + __tablename__ = 'deployments' + + _private_fields = ['blueprint_fk'] + + created_at = Column(DateTime, nullable=False, index=True) + description = Column(Text) + inputs = Column(Dict) + groups = Column(Dict) + permalink = Column(Text) + policy_triggers = Column(Dict) + policy_types = Column(Dict) + outputs = Column(Dict) + scaling_groups = Column(List) + updated_at = Column(DateTime) + workflows = Column(List) + + @declared_attr + def blueprint_fk(cls): + return cls.foreign_key('blueprints', nullable=False) + + @declared_attr + def blueprint(cls): + return cls.one_to_many_relationship('blueprint_fk', 'Blueprint') + + @declared_attr + def blueprint_id(cls): + return association_proxy('blueprint', cls.user_id_column()) + + +class ExecutionBase(ModelBase): + """ + Execution model representation. + """ + # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. + id = None + + __tablename__ = 'executions' + + TERMINATED = 'terminated' + FAILED = 'failed' + CANCELLED = 'cancelled' + PENDING = 'pending' + STARTED = 'started' + CANCELLING = 'cancelling' + FORCE_CANCELLING = 'force_cancelling' + + STATES = [TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING] + END_STATES = [TERMINATED, FAILED, CANCELLED] + ACTIVE_STATES = [state for state in STATES if state not in END_STATES] + + VALID_TRANSITIONS = { + PENDING: [STARTED, CANCELLED], + STARTED: END_STATES + [CANCELLING], + CANCELLING: END_STATES + } + + @orm.validates('status') + def validate_status(self, key, value): + """Validation function that verifies execution status transitions are OK""" + try: + current_status = getattr(self, key) + except AttributeError: + return + valid_transitions = ExecutionBase.VALID_TRANSITIONS.get(current_status, []) + if all([current_status is not None, + current_status != value, + value not in valid_transitions]): + raise ValueError('Cannot change execution status from {current} to {new}'.format( + current=current_status, + new=value)) + return value + + _private_fields = ['deployment_id', 'blueprint_id'] + + created_at = Column(DateTime, index=True) + started_at = Column(DateTime, nullable=True, index=True) + ended_at = Column(DateTime, nullable=True, index=True) + error = Column(Text, nullable=True) + is_system_workflow = Column(Boolean, nullable=False, default=False) + parameters = Column(Dict) + status = Column(Enum(*STATES, name='execution_status'), default=PENDING) + workflow_name = Column(Text) + + blueprint = association_proxy('deployment', 'blueprint') + blueprint_id = association_proxy('deployment', 'blueprint_id') + + @declared_attr + def deployment_fk(cls): + return cls.foreign_key('deployments', nullable=False) + + @declared_attr + def deployment(cls): + return cls.one_to_many_relationship('deployment_fk', 'Deployment') + + @declared_attr + def deployment_id(cls): + return association_proxy('deployment', cls.user_id_column()) + + def __str__(self): + return '<{0} id=`{1}` (status={2})>'.format( + self.__class__.__name__, + self.id, + self.status + ) + + +class DeploymentUpdateBase(ModelBase): + """ + Deployment update model representation. + """ + # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. + steps = None + + __tablename__ = 'deployment_updates' + + _private_fields = ['execution_id', 'deployment_id'] + + created_at = Column(DateTime, nullable=False, index=True) + deployment_plan = Column(Dict, nullable=False) + deployment_update_node_instances = Column(Dict) + deployment_update_deployment = Column(Dict) + deployment_update_nodes = Column(Dict) + modified_entity_ids = Column(Dict) + state = Column(Text) + + @declared_attr + def execution_fk(cls): + return cls.foreign_key('executions', nullable=True) + + @declared_attr + def execution_id(cls): + return association_proxy('executions', cls.user_id_column()) + + @declared_attr + def execution(cls): + return cls.one_to_many_relationship('execution_fk', 'Execution') + + @declared_attr + def deployment_fk(cls): + return cls.foreign_key('deployments') + + @declared_attr + def deployment_id(cls): + return association_proxy('deployment', cls.user_id_column()) + + @declared_attr + def deployment(cls): + return cls.one_to_many_relationship('deployment_fk', 'Deployment') + + def to_dict(self, suppress_error=False, **kwargs): + dep_update_dict = super(DeploymentUpdateBase, self).to_dict(suppress_error) #pylint: disable=no-member + # Taking care of the fact the DeploymentSteps are _BaseModels + dep_update_dict['steps'] = [step.to_dict() for step in self.steps] + return dep_update_dict + + +class DeploymentUpdateStepBase(ModelBase): + """ + Deployment update step model representation. + """ + # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. + id = None + + __tablename__ = 'deployment_update_steps' + + _action_types = namedtuple('ACTION_TYPES', 'ADD, REMOVE, MODIFY') + ACTION_TYPES = _action_types(ADD='add', REMOVE='remove', MODIFY='modify') + _entity_types = namedtuple( + 'ENTITY_TYPES', + 'NODE, RELATIONSHIP, PROPERTY, OPERATION, WORKFLOW, OUTPUT, DESCRIPTION, GROUP, ' + 'POLICY_TYPE, POLICY_TRIGGER, PLUGIN') + ENTITY_TYPES = _entity_types( + NODE='node', + RELATIONSHIP='relationship', + PROPERTY='property', + OPERATION='operation', + WORKFLOW='workflow', + OUTPUT='output', + DESCRIPTION='description', + GROUP='group', + POLICY_TYPE='policy_type', + POLICY_TRIGGER='policy_trigger', + PLUGIN='plugin' + ) + + _private_fields = ['deployment_update_id'] + + action = Column(Enum(*ACTION_TYPES, name='action_type'), nullable=False) + entity_id = Column(Text, nullable=False) + entity_type = Column(Enum(*ENTITY_TYPES, name='entity_type'), nullable=False) + + @declared_attr + def deployment_update_fk(cls): + return cls.foreign_key('deployment_updates') + + @declared_attr + def deployment_update_id(cls): + return association_proxy('deployment_updates', cls.user_id_column()) + + @declared_attr + def deployment_update(cls): + return cls.one_to_many_relationship('deployment_update_fk', + 'DeploymentUpdate', + backreference='steps') + + def __hash__(self): + return hash((self.id, self.entity_id)) + + def __lt__(self, other): + """ + the order is 'remove' < 'modify' < 'add' + :param other: + :return: + """ + if not isinstance(other, self.__class__): + return not self >= other + + if self.action != other.action: + if self.action == 'remove': + return_value = True + elif self.action == 'add': + return_value = False + else: + return_value = other.action == 'add' + return return_value + + if self.action == 'add': + return self.entity_type == 'node' and other.entity_type == 'relationship' + if self.action == 'remove': + return self.entity_type == 'relationship' and other.entity_type == 'node' + return False + + +class DeploymentModificationBase(ModelBase): + """ + Deployment modification model representation. + """ + __tablename__ = 'deployment_modifications' + + STARTED = 'started' + FINISHED = 'finished' + ROLLEDBACK = 'rolledback' + + STATES = [STARTED, FINISHED, ROLLEDBACK] + END_STATES = [FINISHED, ROLLEDBACK] + + _private_fields = ['deployment_id'] + + context = Column(Dict) + created_at = Column(DateTime, nullable=False, index=True) + ended_at = Column(DateTime, index=True) + modified_nodes = Column(Dict) + node_instances = Column(Dict) + status = Column(Enum(*STATES, name='deployment_modification_status')) + + @declared_attr + def deployment_id(cls): + return association_proxy('deployment', cls.user_id_column()) + + @declared_attr + def deployment_fk(cls): + return cls.foreign_key('deployments') + + @declared_attr + def deployment(cls): + return cls.one_to_many_relationship('deployment_fk', + 'Deployment', + backreference='modifications') + + +class NodeBase(ModelBase): + """ + Node model representation. + """ + __tablename__ = 'nodes' + + # See base class for an explanation on these properties + is_id_unique = False + + _private_fields = ['deployment_id', 'host_id'] + + @declared_attr + def host_id(cls): + return association_proxy('host', cls.user_id_column()) + + @declared_attr + def host_fk(cls): + return cls.foreign_key('nodes', nullable=True) + + @declared_attr + def host(cls): + return cls.relationship_to_self('host_fk') + + @declared_attr + def deployment_id(cls): + return association_proxy('deployment', cls.user_id_column()) + + @declared_attr + def deployment_fk(cls): + return cls.foreign_key('deployments') + + @declared_attr + def deployment(cls): + return cls.one_to_many_relationship('deployment_fk', 'Deployment') + + deploy_number_of_instances = Column(Integer, nullable=False) + max_number_of_instances = Column(Integer, nullable=False) + min_number_of_instances = Column(Integer, nullable=False) + number_of_instances = Column(Integer, nullable=False) + planned_number_of_instances = Column(Integer, nullable=False) + plugins = Column(List) + plugins_to_install = Column(Dict) + properties = Column(Dict) + operations = Column(Dict) + type = Column(Text, nullable=False, index=True) + type_hierarchy = Column(List) + + +class RelationshipBase(ModelBase): + """ + Relationship model representation. + """ + __tablename__ = 'relationships' + + _private_fields = ['source_node_id', 'target_node_id'] + + @declared_attr + def source_id(cls): + return association_proxy('source_node', cls.user_id_column()) + + @declared_attr + def source_node_fk(cls): + return cls.foreign_key('nodes') + + @declared_attr + def source_node(cls): + return cls.one_to_many_relationship('source_node_fk', + 'Node', + backreference='outbound_relationships') + @declared_attr + def target_name(cls): + return association_proxy('target_node', cls.user_id_column()) + + @declared_attr + def target_node_fk(cls): + return cls.foreign_key('nodes') + + @declared_attr + def target_node(cls): + return cls.one_to_many_relationship('target_node_fk', + 'Node', + backreference='inbound_relationships') + + source_interfaces = Column(Dict) + source_operations = Column(Dict, nullable=False) + target_interfaces = Column(Dict) + target_operations = Column(Dict, nullable=False) + type = Column(String, nullable=False) + type_hierarchy = Column(List) + properties = Column(Dict) + + +class NodeInstanceBase(ModelBase): + """ + Node instance model representation. + """ + __tablename__ = 'node_instances' + _private_fields = ['node_id', 'host_id'] + + runtime_properties = Column(Dict) + scaling_groups = Column(List) + state = Column(Text, nullable=False) + version = Column(Integer, default=1) + + @declared_attr + def host_id(cls): + return association_proxy('host', cls.user_id_column()) + + @declared_attr + def host_fk(cls): + return cls.foreign_key('node_instances', nullable=True) + + @declared_attr + def host(cls): + return cls.relationship_to_self('host_fk') + + deployment = association_proxy('node', 'deployment') + deployment_id = association_proxy('node', 'deployment_id') + deployment_name = association_proxy('node', 'deployment_name') + + @declared_attr + def node_id(cls): + return association_proxy('node', cls.user_id_column()) + + @declared_attr + def node_fk(cls): + return cls.foreign_key('nodes', nullable=True) + + @declared_attr + def node(cls): + return cls.one_to_many_relationship('node_fk', 'Node') + + +class RelationshipInstanceBase(ModelBase): + """ + Relationship instance model representation. + """ + __tablename__ = 'relationship_instances' + _private_fields = ['relationship_storage_id', + 'source_node_instance_id', + 'target_node_instance_id'] + + @declared_attr + def source_node_instance_id(cls): + return association_proxy('source_node_instance', cls.user_id_column()) + + @declared_attr + def source_node_instance_fk(cls): + return cls.foreign_key('node_instances') + + @declared_attr + def source_node_instance(cls): + return cls.one_to_many_relationship('source_node_instance_fk', + 'NodeInstance', + backreference='outbound_relationship_instances') + + @declared_attr + def target_node_instance_id(cls): + return association_proxy('target_node_instance', cls.user_id_column()) + + @declared_attr + def target_node_instance_fk(cls): + return cls.foreign_key('node_instances') + + @declared_attr + def target_node_instance(cls): + return cls.one_to_many_relationship('target_node_instance_fk', + 'NodeInstance', + backreference='inbound_relationship_instances') + + @declared_attr + def relationship_fk(cls): + return cls.foreign_key('relationships') + + @declared_attr + def relationship_id(cls): + return association_proxy('relationship', cls.user_id_column()) + + @declared_attr + def relationship(cls): + return cls.one_to_many_relationship('relationship_fk', 'Relationship') + + +class PluginBase(ModelBase): + """ + Plugin model representation. + """ + __tablename__ = 'plugins' + + archive_name = Column(Text, nullable=False, index=True) + distribution = Column(Text) + distribution_release = Column(Text) + distribution_version = Column(Text) + excluded_wheels = Column(Dict) + package_name = Column(Text, nullable=False, index=True) + package_source = Column(Text) + package_version = Column(Text) + supported_platform = Column(Dict) + supported_py_versions = Column(Dict) + uploaded_at = Column(DateTime, nullable=False, index=True) + wheels = Column(Dict, nullable=False) + + +class TaskBase(ModelBase): + """ + A Model which represents an task + """ + __tablename__ = 'tasks' + _private_fields = ['node_instance_id', + 'relationship_instance_id', + 'execution_id'] + + @declared_attr + def node_instance_fk(cls): + return cls.foreign_key('node_instances', nullable=True) + + @declared_attr + def node_instance_id(cls): + return association_proxy('node_instance', cls.user_id_column()) + + @declared_attr + def node_instance(cls): + return cls.one_to_many_relationship('node_instance_fk', 'NodeInstance') + + @declared_attr + def relationship_instance_fk(cls): + return cls.foreign_key('relationship_instances', nullable=True) + + @declared_attr + def relationship_instance_id(cls): + return association_proxy('relationship_instance', cls.user_id_column()) + + @declared_attr + def relationship_instance(cls): + return cls.one_to_many_relationship('relationship_instance_fk', 'RelationshipInstance') + + PENDING = 'pending' + RETRYING = 'retrying' + SENT = 'sent' + STARTED = 'started' + SUCCESS = 'success' + FAILED = 'failed' + STATES = ( + PENDING, + RETRYING, + SENT, + STARTED, + SUCCESS, + FAILED, + ) + + WAIT_STATES = [PENDING, RETRYING] + END_STATES = [SUCCESS, FAILED] + + @orm.validates('max_attempts') + def validate_max_attempts(self, _, value): # pylint: disable=no-self-use + """Validates that max attempts is either -1 or a positive number""" + if value < 1 and value != TaskBase.INFINITE_RETRIES: + raise ValueError('Max attempts can be either -1 (infinite) or any positive number. ' + 'Got {value}'.format(value=value)) + return value + + INFINITE_RETRIES = -1 + + status = Column(Enum(*STATES), name='status', default=PENDING) + + due_at = Column(DateTime, default=datetime.utcnow) + started_at = Column(DateTime, default=None) + ended_at = Column(DateTime, default=None) + max_attempts = Column(Integer, default=1) + retry_count = Column(Integer, default=0) + retry_interval = Column(Float, default=0) + ignore_failure = Column(Boolean, default=False) + + # Operation specific fields + operation_mapping = Column(String) + inputs = Column(Dict) + + @declared_attr + def execution_fk(cls): + return cls.foreign_key('executions', nullable=True) + + @declared_attr + def execution_id(cls): + return association_proxy('execution', cls.user_id_column()) + + @declared_attr + def execution(cls): + return cls.one_to_many_relationship('execution_fk', 'Execution') + + @property + def actor(self): + """ + Return the actor of the task + :return: + ` """ + return self.node_instance or self.relationship_instance + + @classmethod + def as_node_instance(cls, instance_fk, **kwargs): + return cls(node_instance_fk=instance_fk, **kwargs) + + @classmethod + def as_relationship_instance(cls, instance_fk, **kwargs): + return cls(relationship_instance_fk=instance_fk, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/storage/model.py ---------------------------------------------------------------------- diff --git a/aria/storage/model.py b/aria/storage/model.py new file mode 100644 index 0000000..52f2612 --- /dev/null +++ b/aria/storage/model.py @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Aria's storage.models module +Path: aria.storage.models + +models module holds aria's models. + +classes: + * Field - represents a single field. + * IterField - represents an iterable field. + * Model - abstract model implementation. + * Snapshot - snapshots implementation model. + * Deployment - deployment implementation model. + * DeploymentUpdateStep - deployment update step implementation model. + * DeploymentUpdate - deployment update implementation model. + * DeploymentModification - deployment modification implementation model. + * Execution - execution implementation model. + * Node - node implementation model. + * Relationship - relationship implementation model. + * NodeInstance - node instance implementation model. + * RelationshipInstance - relationship instance implementation model. + * ProviderContext - provider context implementation model. + * Plugin - plugin implementation model. +""" + +from sqlalchemy.ext.declarative import declarative_base + +from . import structure +from . import base_model as base + +__all__ = ( + 'Blueprint', + 'Deployment', + 'DeploymentUpdateStep', + 'DeploymentUpdate', + 'DeploymentModification', + 'Execution', + 'Node', + 'Relationship', + 'NodeInstance', + 'RelationshipInstance', + 'Plugin', +) + + +DeclarativeBase = declarative_base(cls=(structure.ARIADeclarativeBase, structure.ModelIdMixin)) + + +class Blueprint(DeclarativeBase, base.BlueprintBase): + pass + + +class Deployment(DeclarativeBase, base.DeploymentBase): + pass + + +class Execution(DeclarativeBase, base.ExecutionBase): + pass + + +class DeploymentUpdate(DeclarativeBase, base.DeploymentUpdateBase): + pass + + +class DeploymentUpdateStep(DeclarativeBase, base.DeploymentUpdateStepBase): + pass + + +class DeploymentModification(DeclarativeBase, base.DeploymentModificationBase): + pass + + +class Node(DeclarativeBase, base.NodeBase): + pass + + +class Relationship(DeclarativeBase, base.RelationshipBase): + pass + + +class NodeInstance(DeclarativeBase, base.NodeInstanceBase): + pass + + +class RelationshipInstance(DeclarativeBase, base.RelationshipInstanceBase): + pass + + +class Plugin(DeclarativeBase, base.PluginBase): + pass + + +class Task(DeclarativeBase, base.TaskBase): + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py deleted file mode 100644 index 96af8cc..0000000 --- a/aria/storage/models.py +++ /dev/null @@ -1,111 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.models module -Path: aria.storage.models - -models module holds aria's models. - -classes: - * Field - represents a single field. - * IterField - represents an iterable field. - * Model - abstract model implementation. - * Snapshot - snapshots implementation model. - * Deployment - deployment implementation model. - * DeploymentUpdateStep - deployment update step implementation model. - * DeploymentUpdate - deployment update implementation model. - * DeploymentModification - deployment modification implementation model. - * Execution - execution implementation model. - * Node - node implementation model. - * Relationship - relationship implementation model. - * NodeInstance - node instance implementation model. - * RelationshipInstance - relationship instance implementation model. - * ProviderContext - provider context implementation model. - * Plugin - plugin implementation model. -""" - -from . import structures -from . import models_base as base - -__all__ = ( - 'Blueprint', - 'Deployment', - 'DeploymentUpdateStep', - 'DeploymentUpdate', - 'DeploymentModification', - 'Execution', - 'Node', - 'Relationship', - 'NodeInstance', - 'RelationshipInstance', - 'ProviderContext', - 'Plugin', -) - - -DeclarativeBase = structures.declarative_base(structures.ModelBase) - - -class Blueprint(base.ModelCommon, base.BlueprintBase, DeclarativeBase): - pass - - -class Deployment(base.ModelCommon, base.DeploymentBase, DeclarativeBase): - pass - - -class Execution(base.ModelCommon, base.ExecutionBase, DeclarativeBase): - pass - - -class DeploymentUpdate(base.ModelCommon, base.DeploymentUpdateBase, DeclarativeBase): - pass - - -class DeploymentUpdateStep(base.ModelCommon, base.DeploymentUpdateStepBase, DeclarativeBase): - pass - - -class DeploymentModification(base.ModelCommon, base.DeploymentModificationBase, DeclarativeBase): - pass - - -class Node(base.ModelCommon, base.NodeBase, DeclarativeBase): - pass - - -class Relationship(base.ModelCommon, base.RelationshipBase, DeclarativeBase): - pass - - -class NodeInstance(base.ModelCommon, base.NodeInstanceBase, DeclarativeBase): - pass - - -class RelationshipInstance(base.ModelCommon, base.RelationshipInstanceBase, DeclarativeBase): - pass - - -class ProviderContext(base.ModelCommon, base.ProviderContextBase, DeclarativeBase): - pass - - -class Plugin(base.ModelCommon, base.PluginBase, DeclarativeBase): - pass - - -class Task(base.ModelCommon, base.TaskBase, DeclarativeBase): - pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/storage/models_base.py ---------------------------------------------------------------------- diff --git a/aria/storage/models_base.py b/aria/storage/models_base.py deleted file mode 100644 index 95ad01e..0000000 --- a/aria/storage/models_base.py +++ /dev/null @@ -1,701 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.models module -Path: aria.storage.models - -models module holds aria's models. - -classes: - * Field - represents a single field. - * IterField - represents an iterable field. - * Model - abstract model implementation. - * Snapshot - snapshots implementation model. - * Deployment - deployment implementation model. - * DeploymentUpdateStep - deployment update step implementation model. - * DeploymentUpdate - deployment update implementation model. - * DeploymentModification - deployment modification implementation model. - * Execution - execution implementation model. - * Node - node implementation model. - * Relationship - relationship implementation model. - * NodeInstance - node instance implementation model. - * RelationshipInstance - relationship instance implementation model. - * ProviderContext - provider context implementation model. - * Plugin - plugin implementation model. -""" -from collections import namedtuple -from datetime import datetime - -from sqlalchemy.ext.associationproxy import association_proxy - -from .structures import ( - Column, - Integer, - Text, - DateTime, - Boolean, - Enum, - String, - Float, - List, - Dict, - foreign_key, - one_to_many_relationship, - relationship_to_self, - orm, - declared_attr, -) - -__all__ = ( - 'ModelCommon', - 'BlueprintBase', - 'DeploymentBase', - 'DeploymentUpdateStepBase', - 'DeploymentUpdateBase', - 'DeploymentModificationBase', - 'ExecutionBase', - 'NodeBase', - 'RelationshipBase', - 'NodeInstanceBase', - 'RelationshipInstanceBase', - 'ProviderContextBase', - 'PluginBase', - 'TaskBase' -) - -#pylint: disable=no-self-argument - - -class ModelCommon(object): - id = Column(Integer, primary_key=True, autoincrement=True) - storage_id_column = 'id' - user_id_column = 'name' - - -class BlueprintBase(object): - """ - Blueprint model representation. - """ - __tablename__ = 'blueprints' - - name = Column(Text, index=True) - created_at = Column(DateTime, nullable=False, index=True) - main_file_name = Column(Text, nullable=False) - plan = Column(Dict, nullable=False) - updated_at = Column(DateTime) - description = Column(Text) - - -class DeploymentBase(object): - """ - Deployment model representation. - """ - __tablename__ = 'deployments' - - _private_fields = ['blueprint_fk'] - - name = Column(Text, index=True) - created_at = Column(DateTime, nullable=False, index=True) - description = Column(Text) - inputs = Column(Dict) - groups = Column(Dict) - permalink = Column(Text) - policy_triggers = Column(Dict) - policy_types = Column(Dict) - outputs = Column(Dict) - scaling_groups = Column(List) - updated_at = Column(DateTime) - workflows = Column(List) - - @declared_attr - def blueprint_fk(cls): - return foreign_key(cls, 'blueprints', nullable=False) - - @declared_attr - def blueprint(cls): - return one_to_many_relationship(cls, 'blueprint_fk', 'Blueprint') - - @declared_attr - def blueprint_id(cls): - return association_proxy('blueprint', cls.user_id_column) - - -class ExecutionBase(object): - """ - Execution model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - id = None - - __tablename__ = 'executions' - - TERMINATED = 'terminated' - FAILED = 'failed' - CANCELLED = 'cancelled' - PENDING = 'pending' - STARTED = 'started' - CANCELLING = 'cancelling' - FORCE_CANCELLING = 'force_cancelling' - - STATES = [TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING] - END_STATES = [TERMINATED, FAILED, CANCELLED] - ACTIVE_STATES = [state for state in STATES if state not in END_STATES] - - VALID_TRANSITIONS = { - PENDING: [STARTED, CANCELLED], - STARTED: END_STATES + [CANCELLING], - CANCELLING: END_STATES - } - - @orm.validates('status') - def validate_status(self, key, value): - """Validation function that verifies execution status transitions are OK""" - try: - current_status = getattr(self, key) - except AttributeError: - return - valid_transitions = ExecutionBase.VALID_TRANSITIONS.get(current_status, []) - if all([current_status is not None, - current_status != value, - value not in valid_transitions]): - raise ValueError('Cannot change execution status from {current} to {new}'.format( - current=current_status, - new=value)) - return value - - _private_fields = ['deployment_id', 'blueprint_id'] - - created_at = Column(DateTime, index=True) - started_at = Column(DateTime, nullable=True, index=True) - ended_at = Column(DateTime, nullable=True, index=True) - error = Column(Text, nullable=True) - is_system_workflow = Column(Boolean, nullable=False, default=False) - parameters = Column(Dict) - status = Column(Enum(*STATES, name='execution_status'), default=PENDING) - workflow_name = Column(Text) - - blueprint = association_proxy('deployment', 'blueprint') - blueprint_id = association_proxy('deployment', 'blueprint_id') - - @declared_attr - def deployment_fk(cls): - return foreign_key(cls, 'deployments', nullable=False) - - @declared_attr - def deployment(cls): - return one_to_many_relationship(cls, 'deployment_fk', 'Deployment') - - @declared_attr - def deployment_id(cls): - return association_proxy('deployment', cls.user_id_column) - - def __str__(self): - return '<{0} id=`{1}` (status={2})>'.format( - self.__class__.__name__, - self.id, - self.status - ) - - -class DeploymentUpdateBase(object): - """ - Deployment update model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - steps = None - - __tablename__ = 'deployment_updates' - - _private_fields = ['execution_id', 'deployment_id'] - - created_at = Column(DateTime, nullable=False, index=True) - deployment_plan = Column(Dict, nullable=False) - deployment_update_node_instances = Column(Dict) - deployment_update_deployment = Column(Dict) - deployment_update_nodes = Column(Dict) - modified_entity_ids = Column(Dict) - state = Column(Text) - - @declared_attr - def execution_fk(cls): - return foreign_key(cls, 'executions', nullable=True) - - @declared_attr - def execution_id(cls): - return association_proxy('executions', cls.user_id_column) - - @declared_attr - def execution(cls): - return one_to_many_relationship(cls, 'execution_fk', 'Execution') - - @declared_attr - def deployment_fk(cls): - return foreign_key(cls, 'deployments') - - @declared_attr - def deployment_id(cls): - return association_proxy('deployment', cls.user_id_column) - - @declared_attr - def deployment(cls): - return one_to_many_relationship(cls, 'deployment_fk', 'Deployment') - - def to_dict(self, suppress_error=False, **kwargs): - dep_update_dict = super(DeploymentUpdateBase, self).to_dict(suppress_error) #pylint: disable=no-member - # Taking care of the fact the DeploymentSteps are _BaseModels - dep_update_dict['steps'] = [step.to_dict() for step in self.steps] - return dep_update_dict - - -class DeploymentUpdateStepBase(object): - """ - Deployment update step model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - id = None - - __tablename__ = 'deployment_update_steps' - - _action_types = namedtuple('ACTION_TYPES', 'ADD, REMOVE, MODIFY') - ACTION_TYPES = _action_types(ADD='add', REMOVE='remove', MODIFY='modify') - _entity_types = namedtuple( - 'ENTITY_TYPES', - 'NODE, RELATIONSHIP, PROPERTY, OPERATION, WORKFLOW, OUTPUT, DESCRIPTION, GROUP, ' - 'POLICY_TYPE, POLICY_TRIGGER, PLUGIN') - ENTITY_TYPES = _entity_types( - NODE='node', - RELATIONSHIP='relationship', - PROPERTY='property', - OPERATION='operation', - WORKFLOW='workflow', - OUTPUT='output', - DESCRIPTION='description', - GROUP='group', - POLICY_TYPE='policy_type', - POLICY_TRIGGER='policy_trigger', - PLUGIN='plugin' - ) - - _private_fields = ['deployment_update_id'] - - action = Column(Enum(*ACTION_TYPES, name='action_type'), nullable=False) - entity_id = Column(Text, nullable=False) - entity_type = Column(Enum(*ENTITY_TYPES, name='entity_type'), nullable=False) - - @declared_attr - def deployment_update_fk(cls): - return foreign_key(cls, 'deployment_updates') - - @declared_attr - def deployment_update_id(cls): - return association_proxy('deployment_updates', cls.user_id_column) - - @declared_attr - def deployment_update(cls): - return one_to_many_relationship(cls, - 'deployment_update_fk', - 'DeploymentUpdate', - backreference='steps') - - def __hash__(self): - return hash((self.id, self.entity_id)) - - def __lt__(self, other): - """ - the order is 'remove' < 'modify' < 'add' - :param other: - :return: - """ - if not isinstance(other, self.__class__): - return not self >= other - - if self.action != other.action: - if self.action == 'remove': - return_value = True - elif self.action == 'add': - return_value = False - else: - return_value = other.action == 'add' - return return_value - - if self.action == 'add': - return self.entity_type == 'node' and other.entity_type == 'relationship' - if self.action == 'remove': - return self.entity_type == 'relationship' and other.entity_type == 'node' - return False - - -class DeploymentModificationBase(object): - """ - Deployment modification model representation. - """ - __tablename__ = 'deployment_modifications' - - STARTED = 'started' - FINISHED = 'finished' - ROLLEDBACK = 'rolledback' - - STATES = [STARTED, FINISHED, ROLLEDBACK] - END_STATES = [FINISHED, ROLLEDBACK] - - _private_fields = ['deployment_id'] - - context = Column(Dict) - created_at = Column(DateTime, nullable=False, index=True) - ended_at = Column(DateTime, index=True) - modified_nodes = Column(Dict) - node_instances = Column(Dict) - status = Column(Enum(*STATES, name='deployment_modification_status')) - - @declared_attr - def deployment_id(cls): - return association_proxy('deployment', cls.user_id_column) - - @declared_attr - def deployment_fk(cls): - return foreign_key(cls, 'deployments') - - @declared_attr - def deployment(cls): - return one_to_many_relationship(cls, - 'deployment_fk', - 'Deployment', - backreference='modifications') - - -class NodeBase(object): - """ - Node model representation. - """ - __tablename__ = 'nodes' - - # See base class for an explanation on these properties - is_id_unique = False - - _private_fields = ['deployment_id', 'host_id'] - - @declared_attr - def host_id(cls): - return association_proxy('host', cls.user_id_column) - - @declared_attr - def host_fk(cls): - return foreign_key(cls, 'nodes', nullable=True) - - @declared_attr - def host(cls): - return relationship_to_self(cls, 'host_fk') - - @declared_attr - def deployment_id(cls): - return association_proxy('deployment', cls.user_id_column) - - @declared_attr - def deployment_fk(cls): - return foreign_key(cls, 'deployments') - - @declared_attr - def deployment(cls): - return one_to_many_relationship(cls, 'deployment_fk', 'Deployment') - - name = Column(Text, index=True) - deploy_number_of_instances = Column(Integer, nullable=False) - max_number_of_instances = Column(Integer, nullable=False) - min_number_of_instances = Column(Integer, nullable=False) - number_of_instances = Column(Integer, nullable=False) - planned_number_of_instances = Column(Integer, nullable=False) - plugins = Column(List) - plugins_to_install = Column(Dict) - properties = Column(Dict) - operations = Column(Dict) - type = Column(Text, nullable=False, index=True) - type_hierarchy = Column(List) - - -class RelationshipBase(object): - """ - Relationship model representation. - """ - __tablename__ = 'relationships' - - _private_fields = ['source_node_id', 'target_node_id'] - - @declared_attr - def source_id(cls): - return association_proxy('source_node', cls.user_id_column) - - @declared_attr - def source_node_fk(cls): - return foreign_key(cls, 'nodes') - - @declared_attr - def source_node(cls): - return one_to_many_relationship(cls, - 'source_node_fk', - 'Node', - backreference='outbound_relationships') - @declared_attr - def target_name(cls): - return association_proxy('target_node', cls.user_id_column) - - @declared_attr - def target_node_fk(cls): - return foreign_key(cls, 'nodes') - - @declared_attr - def target_node(cls): - return one_to_many_relationship(cls, - 'target_node_fk', - 'Node', - backreference='inbound_relationships') - - source_interfaces = Column(Dict) - source_operations = Column(Dict, nullable=False) - target_interfaces = Column(Dict) - target_operations = Column(Dict, nullable=False) - type = Column(String, nullable=False) - type_hierarchy = Column(List) - properties = Column(Dict) - - -class NodeInstanceBase(object): - """ - Node instance model representation. - """ - __tablename__ = 'node_instances' - _private_fields = ['node_id', 'host_id'] - - name = Column(Text, index=True) - runtime_properties = Column(Dict) - scaling_groups = Column(List) - state = Column(Text, nullable=False) - version = Column(Integer, default=1) - - @declared_attr - def host_id(cls): - return association_proxy('host', cls.user_id_column) - - @declared_attr - def host_fk(cls): - return foreign_key(cls, 'node_instances', nullable=True) - - @declared_attr - def host(cls): - return relationship_to_self(cls, 'host_fk') - - deployment = association_proxy('node', 'deployment') - deployment_id = association_proxy('node', 'deployment_id') - deployment_name = association_proxy('node', 'deployment_name') - - @declared_attr - def node_id(cls): - return association_proxy('node', cls.user_id_column) - - @declared_attr - def node_fk(cls): - return foreign_key(cls, 'nodes', nullable=True) - - @declared_attr - def node(cls): - return one_to_many_relationship(cls, 'node_fk', 'Node') - - -class RelationshipInstanceBase(object): - """ - Relationship instance model representation. - """ - __tablename__ = 'relationship_instances' - _private_fields = ['relationship_storage_id', - 'source_node_instance_id', - 'target_node_instance_id'] - - @declared_attr - def source_node_instance_id(cls): - return association_proxy('source_node_instance', cls.user_id_column) - - @declared_attr - def source_node_instance_fk(cls): - return foreign_key(cls, 'node_instances') - - @declared_attr - def source_node_instance(cls): - return one_to_many_relationship(cls, - 'source_node_instance_fk', - 'NodeInstance', - backreference='outbound_relationship_instances') - - @declared_attr - def target_node_instance_id(cls): - return association_proxy('target_node_instance', cls.user_id_column) - - @declared_attr - def target_node_instance_fk(cls): - return foreign_key(cls, 'node_instances') - - @declared_attr - def target_node_instance(cls): - return one_to_many_relationship(cls, - 'target_node_instance_fk', - 'NodeInstance', - backreference='inbound_relationship_instances') - - @declared_attr - def relationship_fk(cls): - return foreign_key(cls, 'relationships') - - @declared_attr - def relationship_id(cls): - return association_proxy('relationship', cls.user_id_column) - - @declared_attr - def relationship(cls): - return one_to_many_relationship(cls, 'relationship_fk', 'Relationship') - - -class ProviderContextBase(object): - """ - Provider context model representation. - """ - __tablename__ = 'provider_context' - - name = Column(Text, nullable=True) - id = Column(Text, nullable=False) - context = Column(Dict, nullable=False) - - -class PluginBase(object): - """ - Plugin model representation. - """ - __tablename__ = 'plugins' - - archive_name = Column(Text, nullable=False, index=True) - distribution = Column(Text) - distribution_release = Column(Text) - distribution_version = Column(Text) - excluded_wheels = Column(Dict) - package_name = Column(Text, nullable=False, index=True) - package_source = Column(Text) - package_version = Column(Text) - supported_platform = Column(Dict) - supported_py_versions = Column(Dict) - uploaded_at = Column(DateTime, nullable=False, index=True) - wheels = Column(Dict, nullable=False) - - -class TaskBase(object): - """ - A Model which represents an task - """ - __tablename__ = 'tasks' - _private_fields = ['node_instance_id', - 'relationship_instance_id', - 'execution_id'] - - @declared_attr - def node_instance_fk(cls): - return foreign_key(cls, 'node_instances', nullable=True) - - @declared_attr - def node_instance_id(cls): - return association_proxy('node_instance', cls.user_id_column) - - @declared_attr - def node_instance(cls): - return one_to_many_relationship(cls, 'node_instance_fk', 'NodeInstance') - - @declared_attr - def relationship_instance_fk(cls): - return foreign_key(cls, 'relationship_instances', nullable=True) - - @declared_attr - def relationship_instance_id(cls): - return association_proxy('relationship_instance', cls.user_id_column) - - @declared_attr - def relationship_instance(cls): - return one_to_many_relationship(cls, 'relationship_instance_fk', 'RelationshipInstance') - - PENDING = 'pending' - RETRYING = 'retrying' - SENT = 'sent' - STARTED = 'started' - SUCCESS = 'success' - FAILED = 'failed' - STATES = ( - PENDING, - RETRYING, - SENT, - STARTED, - SUCCESS, - FAILED, - ) - - WAIT_STATES = [PENDING, RETRYING] - END_STATES = [SUCCESS, FAILED] - - @orm.validates('max_attempts') - def validate_max_attempts(self, _, value): # pylint: disable=no-self-use - """Validates that max attempts is either -1 or a positive number""" - if value < 1 and value != TaskBase.INFINITE_RETRIES: - raise ValueError('Max attempts can be either -1 (infinite) or any positive number. ' - 'Got {value}'.format(value=value)) - return value - - INFINITE_RETRIES = -1 - - status = Column(Enum(*STATES), name='status', default=PENDING) - - due_at = Column(DateTime, default=datetime.utcnow) - started_at = Column(DateTime, default=None) - ended_at = Column(DateTime, default=None) - max_attempts = Column(Integer, default=1) - retry_count = Column(Integer, default=0) - retry_interval = Column(Float, default=0) - ignore_failure = Column(Boolean, default=False) - - # Operation specific fields - name = Column(String) - operation_mapping = Column(String) - inputs = Column(Dict) - - @declared_attr - def execution_fk(cls): - return foreign_key(cls, 'executions', nullable=True) - - @declared_attr - def execution_id(cls): - return association_proxy('execution', cls.user_id_column) - - @declared_attr - def execution(cls): - return one_to_many_relationship(cls, 'execution_fk', 'Execution') - - @property - def actor(self): - """ - Return the actor of the task - :return: - ` """ - return self.node_instance or self.relationship_instance - - @classmethod - def as_node_instance(cls, instance_fk, **kwargs): - return cls(node_instance_fk=instance_fk, **kwargs) - - @classmethod - def as_relationship_instance(cls, instance_fk, **kwargs): - return cls(relationship_instance_fk=instance_fk, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/storage/structure.py ---------------------------------------------------------------------- diff --git a/aria/storage/structure.py b/aria/storage/structure.py new file mode 100644 index 0000000..dd757b7 --- /dev/null +++ b/aria/storage/structure.py @@ -0,0 +1,179 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Aria's storage.structures module +Path: aria.storage.structures + +models module holds aria's models. + +classes: + * Field - represents a single field. + * IterField - represents an iterable field. + * PointerField - represents a single pointer field. + * IterPointerField - represents an iterable pointers field. + * Model - abstract model implementation. +""" + +from sqlalchemy.orm import relationship, backref +from sqlalchemy import ( + Column, + ForeignKey, + Integer, + Text +) + + +class ModelBase(object): + + # overriding pylint stuff + __tablename__ = None + + @classmethod + def storage_id_column(cls): + raise NotImplementedError + + @classmethod + def user_id_column(cls): + raise NotImplementedError + + @classmethod + def _get_cls_by_tablename(cls, tablename): + """Return class reference mapped to table. + + :param tablename: String with name of table. + :return: Class reference or None. + """ + if tablename in (cls.__name__, cls.__tablename__): + return cls + + for table_cls in cls._decl_class_registry.values(): + if tablename in (getattr(table_cls, '__name__', None), + getattr(table_cls, '__tablename__', None)): + return table_cls + + @classmethod + def foreign_key(cls, tablename, nullable=False): + """Return a ForeignKey object with the relevant + + :param tablename: Unique id column in the parent table + :param nullable: Should the column be allowed to remain empty + """ + table = cls._get_cls_by_tablename(tablename) + foreign_key_str = '{tablename}.{unique_id}'.format(tablename=tablename, + unique_id=table.storage_id_column()) + return Column(ForeignKey(foreign_key_str, ondelete='CASCADE'), nullable=nullable) + + @classmethod + def one_to_many_relationship(cls, + foreign_key_column, + parent_class, + backreference=None): + """Return a one-to-many SQL relationship object + Meant to be used from inside the *child* object + + :param parent_class: Class of the parent table + :param cls: Class of the child table + :param foreign_key_column: The column of the foreign key (from the child table) + :param backreference: The name to give to the reference to the child (on the parent table) + """ + parent_table = cls._get_cls_by_tablename(parent_class) + primaryjoin_str = \ + '{parent_class_name}.{parent_unique_id} == {child_class.__name__}.{foreign_key_column}'\ + .format( + parent_class_name=parent_class, + parent_unique_id=parent_table.storage_id_column(), + child_class=cls, + foreign_key_column=foreign_key_column + ) + return relationship( + parent_class, + primaryjoin=primaryjoin_str, + # The following line make sure that when the *parent* is + # deleted, all its connected children are deleted as well + backref=backref(backreference or cls.__tablename__, cascade='all') + ) + + @classmethod + def relationship_to_self(cls, local_column): + + remote_side_str = '{cls.__name__}.{remote_column}'.format( + cls=cls, + remote_column=cls.storage_id_column() + ) + primaryjoin_str = '{remote_side_str} == {cls.__name__}.{local_column}'.format( + remote_side_str=remote_side_str, + cls=cls, + local_column=local_column) + return relationship(cls.__name__, + primaryjoin=primaryjoin_str, + remote_side=remote_side_str) + + +class ModelIdMixin(object): + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(Text, nullable=True, index=True) + + @classmethod + def storage_id_column(cls): + return 'id' + + @classmethod + def user_id_column(cls): + return 'name' + + +class ARIADeclarativeBase(object): + """ + Abstract base class for all SQL models that allows [de]serialization + """ + # This would be overridden once the models are created. Created for pylint. + __table__ = None + + _private_fields = [] + + def to_dict(self, suppress_error=False): + """Return a dict representation of the model + + :param suppress_error: If set to True, sets `None` to attributes that + it's unable to retrieve (e.g., if a relationship wasn't established + yet, and so it's impossible to access a property through it) + """ + if suppress_error: + res = dict() + for field in self.fields(): + try: + field_value = getattr(self, field) + except AttributeError: + field_value = None + res[field] = field_value + else: + # Can't simply call here `self.to_response()` because inheriting + # class might override it, but we always need the same code here + res = dict((f, getattr(self, f)) for f in self.fields()) + return res + + @classmethod + def fields(cls): + """Return the list of field names for this table + + Mostly for backwards compatibility in the code (that uses `fields`) + """ + return set(cls.__table__.columns.keys()) - set(cls._private_fields) + + def __repr__(self): + return '<{__class__.__name__} id=`{id}`>'.format( + __class__=self.__class__, + id=getattr(self, self.user_id_column())) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py deleted file mode 100644 index 6949305..0000000 --- a/aria/storage/structures.py +++ /dev/null @@ -1,267 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.structures module -Path: aria.storage.structures - -models module holds aria's models. - -classes: - * Field - represents a single field. - * IterField - represents an iterable field. - * PointerField - represents a single pointer field. - * IterPointerField - represents an iterable pointers field. - * Model - abstract model implementation. -""" -import json - -from sqlalchemy.ext.mutable import Mutable -from sqlalchemy.orm import relationship, backref -# pylint: disable=unused-import -from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.ext.declarative import declared_attr, declarative_base -from sqlalchemy import ( - schema, - VARCHAR, - ARRAY, - Column, - Integer, - Text, - DateTime, - Boolean, - Enum, - String, - PickleType, - Float, - TypeDecorator, - ForeignKey, - orm, - Table, -) - -from aria.storage import exceptions - - -def _get_cls_by_tablename(cls, tablename): - """Return class reference mapped to table. - - :param tablename: String with name of table. - :return: Class reference or None. - """ - if tablename in (cls.__name__, cls.__tablename__): - return cls - - for table_cls in cls._decl_class_registry.values(): - if tablename in (getattr(table_cls, '__name__', None), - getattr(table_cls, '__tablename__', None)): - return table_cls - - -def foreign_key(child_class, tablename, nullable=False): - """Return a ForeignKey object with the relevant - - :param tablename: Unique id column in the parent table - :param nullable: Should the column be allowed to remain empty - """ - table = _get_cls_by_tablename(child_class, tablename) - foreign_key_str = '{tablename}.{unique_id}'.format(tablename=tablename, - unique_id=table.storage_id_column) - return Column(ForeignKey(foreign_key_str, ondelete='CASCADE'), nullable=nullable) - - -def one_to_many_relationship(child_class, - foreign_key_column, - parent_class, - backreference=None): - """Return a one-to-many SQL relationship object - Meant to be used from inside the *child* object - - :param parent_class: Class of the parent table - :param child_class: Class of the child table - :param foreign_key_column: The column of the foreign key (from the child table) - :param backreference: The name to give to the reference to the child (on the parent table) - """ - parent_table = _get_cls_by_tablename(child_class, parent_class) - primaryjoin_str = \ - '{parent_class_name}.{parent_unique_id} == {child_class.__name__}.{foreign_key_column}'\ - .format( - parent_class_name=parent_class, - parent_unique_id=parent_table.storage_id_column, - child_class=child_class, - foreign_key_column=foreign_key_column - ) - return relationship( - parent_class, - primaryjoin=primaryjoin_str, - # The following line make sure that when the *parent* is - # deleted, all its connected children are deleted as well - backref=backref(backreference or child_class.__tablename__, cascade='all') - ) - - -def relationship_to_self(cls, local_column): - - remote_side_str = '{cls.__name__}.{remote_column}'.format(cls=cls, - remote_column=cls.storage_id_column) - primaryjoin_str = '{remote_side_str} == {cls.__name__}.{local_column}'.format( - remote_side_str=remote_side_str, - cls=cls, - local_column=local_column) - return relationship(cls.__name__, - primaryjoin=primaryjoin_str, - remote_side=remote_side_str) - - -class ModelBase(object): - """ - Abstract base class for all SQL models that allows [de]serialization - """ - # This would be overridden once the models are created. Created for pylint. - __table__ = None - id = None - - _private_fields = [] - - def to_dict(self, suppress_error=False): - """Return a dict representation of the model - - :param suppress_error: If set to True, sets `None` to attributes that - it's unable to retrieve (e.g., if a relationship wasn't established - yet, and so it's impossible to access a property through it) - """ - if suppress_error: - res = dict() - for field in self.fields(): - try: - field_value = getattr(self, field) - except AttributeError: - field_value = None - res[field] = field_value - else: - # Can't simply call here `self.to_response()` because inheriting - # class might override it, but we always need the same code here - res = dict((f, getattr(self, f)) for f in self.fields()) - return res - - @classmethod - def fields(cls): - """Return the list of field names for this table - - Mostly for backwards compatibility in the code (that uses `fields`) - """ - return set(cls.__table__.columns.keys()) - set(cls._private_fields) - - def __repr__(self): - return '<{0} id=`{1}`>'.format(self.__class__.__name__, self.id) - - -class _MutableType(TypeDecorator): - """ - Dict representation of type. - """ - @property - def python_type(self): - raise NotImplementedError - - def process_literal_param(self, value, dialect): - pass - - impl = VARCHAR - - def process_bind_param(self, value, dialect): - if value is not None: - value = json.dumps(value) - return value - - def process_result_value(self, value, dialect): - if value is not None: - value = json.loads(value) - return value - - -class Dict(_MutableType): - @property - def python_type(self): - return dict - - -class List(_MutableType): - @property - def python_type(self): - return list - - -class _MutableDict(Mutable, dict): - """ - Enables tracking for dict values. - """ - @classmethod - def coerce(cls, key, value): - "Convert plain dictionaries to MutableDict." - - if not isinstance(value, _MutableDict): - if isinstance(value, dict): - return _MutableDict(value) - - # this call will raise ValueError - try: - return Mutable.coerce(key, value) - except ValueError as e: - raise exceptions.StorageError('SQL Storage error: {0}'.format(str(e))) - else: - return value - - def __setitem__(self, key, value): - "Detect dictionary set events and emit change events." - - dict.__setitem__(self, key, value) - self.changed() - - def __delitem__(self, key): - "Detect dictionary del events and emit change events." - - dict.__delitem__(self, key) - self.changed() - - -class _MutableList(Mutable, list): - - @classmethod - def coerce(cls, key, value): - "Convert plain dictionaries to MutableDict." - - if not isinstance(value, _MutableList): - if isinstance(value, list): - return _MutableList(value) - - # this call will raise ValueError - try: - return Mutable.coerce(key, value) - except ValueError as e: - raise exceptions.StorageError('SQL Storage error: {0}'.format(str(e))) - else: - return value - - def __setitem__(self, key, value): - list.__setitem__(self, key, value) - self.changed() - - def __delitem__(self, key): - list.__delitem__(self, key) - - -_MutableDict.associate_with(Dict) -_MutableList.as_mutable(List) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/aria/storage/type.py ---------------------------------------------------------------------- diff --git a/aria/storage/type.py b/aria/storage/type.py new file mode 100644 index 0000000..b168b7f --- /dev/null +++ b/aria/storage/type.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json + +from sqlalchemy import ( + TypeDecorator, + VARCHAR +) + +from sqlalchemy.ext import mutable + +from . import exceptions + + +class _MutableType(TypeDecorator): + """ + Dict representation of type. + """ + @property + def python_type(self): + raise NotImplementedError + + def process_literal_param(self, value, dialect): + pass + + impl = VARCHAR + + def process_bind_param(self, value, dialect): + if value is not None: + value = json.dumps(value) + return value + + def process_result_value(self, value, dialect): + if value is not None: + value = json.loads(value) + return value + + +class Dict(_MutableType): + @property + def python_type(self): + return dict + + +class List(_MutableType): + @property + def python_type(self): + return list + + +class _MutableDict(mutable.Mutable, dict): + """ + Enables tracking for dict values. + """ + @classmethod + def coerce(cls, key, value): + "Convert plain dictionaries to MutableDict." + + if not isinstance(value, _MutableDict): + if isinstance(value, dict): + return _MutableDict(value) + + # this call will raise ValueError + try: + return mutable.Mutable.coerce(key, value) + except ValueError as e: + raise exceptions.StorageError('SQL Storage error: {0}'.format(str(e))) + else: + return value + + def __setitem__(self, key, value): + "Detect dictionary set events and emit change events." + + dict.__setitem__(self, key, value) + self.changed() + + def __delitem__(self, key): + "Detect dictionary del events and emit change events." + + dict.__delitem__(self, key) + self.changed() + + +class _MutableList(mutable.Mutable, list): + + @classmethod + def coerce(cls, key, value): + "Convert plain dictionaries to MutableDict." + + if not isinstance(value, _MutableList): + if isinstance(value, list): + return _MutableList(value) + + # this call will raise ValueError + try: + return mutable.Mutable.coerce(key, value) + except ValueError as e: + raise exceptions.StorageError('SQL Storage error: {0}'.format(str(e))) + else: + return value + + def __setitem__(self, key, value): + list.__setitem__(self, key, value) + self.changed() + + def __delitem__(self, key): + list.__delitem__(self, key) + + +_MutableDict.associate_with(Dict) +_MutableList.as_mutable(List) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index aeda7ca..c14f2f6 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -15,7 +15,7 @@ from datetime import datetime -from aria.storage import models +from aria.storage import model from . import operations @@ -35,7 +35,7 @@ RELATIONSHIP_INSTANCE_NAME = 'relationship_instance' def get_dependency_node(deployment): - return models.Node( + return model.Node( name=DEPENDENCY_NODE_NAME, type='test_node_type', type_hierarchy=[], @@ -51,7 +51,7 @@ def get_dependency_node(deployment): def get_dependency_node_instance(dependency_node): - return models.NodeInstance( + return model.NodeInstance( name=DEPENDENCY_NODE_INSTANCE_NAME, runtime_properties={'ip': '1.1.1.1'}, version=None, @@ -62,7 +62,7 @@ def get_dependency_node_instance(dependency_node): def get_relationship(source=None, target=None): - return models.Relationship( + return model.Relationship( source_node_fk=source.id, target_node_fk=target.id, source_interfaces={}, @@ -76,7 +76,7 @@ def get_relationship(source=None, target=None): def get_relationship_instance(source_instance, target_instance, relationship): - return models.RelationshipInstance( + return model.RelationshipInstance( relationship_fk=relationship.id, target_node_instance_fk=target_instance.id, source_node_instance_fk=source_instance.id, @@ -84,7 +84,7 @@ def get_relationship_instance(source_instance, target_instance, relationship): def get_dependent_node(deployment): - return models.Node( + return model.Node( name=DEPENDENT_NODE_NAME, deployment_fk=deployment.id, type='test_node_type', @@ -100,7 +100,7 @@ def get_dependent_node(deployment): def get_dependent_node_instance(dependent_node): - return models.NodeInstance( + return model.NodeInstance( name=DEPENDENT_NODE_INSTANCE_NAME, runtime_properties={}, version=None, @@ -112,7 +112,7 @@ def get_dependent_node_instance(dependent_node): def get_blueprint(): now = datetime.now() - return models.Blueprint( + return model.Blueprint( plan={}, name=BLUEPRINT_NAME, description=None, @@ -123,9 +123,9 @@ def get_blueprint(): def get_execution(deployment): - return models.Execution( + return model.Execution( deployment_fk=deployment.id, - status=models.Execution.STARTED, + status=model.Execution.STARTED, workflow_name=WORKFLOW_NAME, started_at=datetime.utcnow(), parameters=None @@ -134,7 +134,7 @@ def get_execution(deployment): def get_deployment(blueprint): now = datetime.utcnow() - return models.Deployment( + return model.Deployment( name=DEPLOYMENT_NAME, blueprint_fk=blueprint.id, description='', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3ac35f63/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 44f7cc1..a6b55ba 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -23,7 +23,7 @@ from aria.orchestrator import ( workflow, operation, ) -from aria.storage import models +from aria.storage import model from aria.orchestrator.workflows import ( api, exceptions, @@ -116,9 +116,7 @@ class BaseTest(object): @pytest.fixture def executor(self): - from aria.orchestrator.workflows.executor import blocking - result = blocking.CurrentThreadBlockingExecutor() - # result = thread.ThreadExecutor() + result = thread.ThreadExecutor() try: yield result finally: @@ -148,7 +146,7 @@ class TestEngine(BaseTest): execution = workflow_context.execution assert execution.started_at <= execution.ended_at <= datetime.utcnow() assert execution.error is None - assert execution.status == models.Execution.TERMINATED + assert execution.status == model.Execution.TERMINATED def test_single_task_successful_execution(self, workflow_context, executor): @workflow @@ -177,7 +175,7 @@ class TestEngine(BaseTest): execution = workflow_context.execution assert execution.started_at <= execution.ended_at <= datetime.utcnow() assert execution.error is not None - assert execution.status == models.Execution.FAILED + assert execution.status == model.Execution.FAILED def test_two_tasks_execution_order(self, workflow_context, executor): @workflow @@ -238,7 +236,7 @@ class TestCancel(BaseTest): execution = workflow_context.execution assert execution.started_at <= execution.ended_at <= datetime.utcnow() assert execution.error is None - assert execution.status == models.Execution.CANCELLED + assert execution.status == model.Execution.CANCELLED def test_cancel_pending_execution(self, workflow_context, executor): @workflow @@ -249,7 +247,7 @@ class TestCancel(BaseTest): executor=executor) eng.cancel_execution() execution = workflow_context.execution - assert execution.status == models.Execution.CANCELLED + assert execution.status == model.Execution.CANCELLED class TestRetries(BaseTest):
