http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/aria/storage/mapi/sql.py ---------------------------------------------------------------------- diff --git a/aria/storage/mapi/sql.py b/aria/storage/mapi/sql.py new file mode 100644 index 0000000..4408aa3 --- /dev/null +++ b/aria/storage/mapi/sql.py @@ -0,0 +1,369 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQLalchemy based MAPI +""" + +from sqlite3 import DatabaseError as SQLiteDBError +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.sql.elements import Label + +from aria.utils.collections import OrderedDict + + +try: + from psycopg2 import DatabaseError as Psycopg2DBError + sql_errors = (SQLAlchemyError, SQLiteDBError, Psycopg2DBError) +except ImportError: + sql_errors = (SQLAlchemyError, SQLiteDBError) + Psycopg2DBError = None + +from ... import storage + + +DEFAULT_SQL_DIALECT = 'sqlite' + + +class SQLAlchemyModelAPI(storage.api.ModelAPI): + """ + SQL based MAPI. + """ + + def __init__(self, + engine, + session, + **kwargs): + super(SQLAlchemyModelAPI, self).__init__(**kwargs) + self._engine = engine + self._session = session + + def get(self, entry_id, include=None, filters=None, locking=False, **kwargs): + """Return a single result based on the model class and element ID + """ + filters = filters or {'id': entry_id} + query = self._get_query(include, filters) + if locking: + query = query.with_for_update() + result = query.first() + + if not result: + raise storage.exceptions.StorageError( + 'Requested {0} with ID `{1}` was not found' + .format(self.model_cls.__name__, entry_id) + ) + return result + + def iter(self, + include=None, + filters=None, + pagination=None, + sort=None, + **kwargs): + """Return a (possibly empty) list of `model_class` results + """ + query = self._get_query(include, filters, sort) + + results, _, _, _ = self._paginate(query, pagination) + + for result in results: + yield result + + def store(self, entry, **kwargs): + """Create a `model_class` instance from a serializable `model` object + + :param entry: A dict with relevant kwargs, or an instance of a class + that has a `to_dict` method, and whose attributes match the columns + of `model_class` (might also my just an instance of `model_class`) + :return: An instance of `model_class` + """ + self._session.add(entry) + self._safe_commit() + return entry + + def delete(self, entry_id, filters=None, **kwargs): + """Delete a single result based on the model class and element ID + """ + try: + instance = self.get( + entry_id, + filters=filters + ) + except storage.exceptions.StorageError: + raise storage.exceptions.StorageError( + 'Could not delete {0} with ID `{1}` - element not found' + .format( + self.model_cls.__name__, + entry_id + ) + ) + self._load_properties(instance) + self._session.delete(instance) + self._safe_commit() + return instance + + # TODO: this might need rework + def update(self, entry, **kwargs): + """Add `instance` to the DB session, and attempt to commit + + :return: The updated instance + """ + return self.store(entry) + + def refresh(self, entry): + """Reload the instance with fresh information from the DB + + :param entry: Instance to be re-loaded from the DB + :return: The refreshed instance + """ + self._session.refresh(entry) + self._load_properties(entry) + return entry + + def _destroy_connection(self): + pass + + def _establish_connection(self): + pass + + def create(self): + self.model_cls.__table__.create(self._engine) + + def drop(self): + """ + Drop the table from the storage. + :return: + """ + self.model_cls.__table__.drop(self._engine) + + def _safe_commit(self): + """Try to commit changes in the session. Roll back if exception raised + Excepts SQLAlchemy errors and rollbacks if they're caught + """ + try: + self._session.commit() + except sql_errors as e: + self._session.rollback() + raise storage.exceptions.StorageError( + 'SQL Storage error: {0}'.format(str(e)) + ) + + def _get_base_query(self, include, joins): + """Create the initial query from the model class and included columns + + :param include: A (possibly empty) list of columns to include in + the query + :param joins: A (possibly empty) list of models on which the query + should join + :return: An SQLAlchemy AppenderQuery object + """ + + # If only some columns are included, query through the session object + if include: + query = self._session.query(*include) + else: + # If all columns should be returned, query directly from the model + query = self._session.query(self.model_cls) + + # Add any joins that might be necessary + for join_model in joins: + query = query.join(join_model) + + return query + + @staticmethod + def _sort_query(query, sort=None): + """Add sorting clauses to the query + + :param query: Base SQL query + :param sort: An optional dictionary where keys are column names to + sort by, and values are the order (asc/desc) + :return: An SQLAlchemy AppenderQuery object + """ + if sort: + for column, order in sort.items(): + if order == 'desc': + column = column.desc() + query = query.order_by(column) + return query + + @staticmethod + def _filter_query(query, filters): + """Add filter clauses to the query + + :param query: Base SQL query + :param filters: An optional dictionary where keys are column names to + filter by, and values are values applicable for those columns (or lists + of such values) + :return: An SQLAlchemy AppenderQuery object + """ + for column, value in filters.items(): + # If there are multiple values, use `in_`, otherwise, use `eq` + if isinstance(value, (list, tuple)): + query = query.filter(column.in_(value)) + else: + query = query.filter(column == value) + + return query + + def _get_query(self, + include=None, + filters=None, + sort=None): + """Get an SQL query object based on the params passed + + :param include: An optional list of columns to include in the query + :param filters: An optional dictionary where keys are column names to + filter by, and values are values applicable for those columns (or lists + of such values) + :param sort: An optional dictionary where keys are column names to + sort by, and values are the order (asc/desc) + :return: A sorted and filtered query with only the relevant + columns + """ + + include = include or [] + filters = filters or dict() + sort = sort or OrderedDict() + + joins = self._get_join_models_list(include, filters, sort) + include, filters, sort = self._get_columns_from_field_names( + include, filters, sort + ) + + query = self._get_base_query(include, joins) + query = self._filter_query(query, filters) + query = self._sort_query(query, sort) + return query + + def _get_columns_from_field_names(self, + include, + filters, + sort): + """Go over the optional parameters (include, filters, sort), and + replace column names with actual SQLA column objects + """ + all_includes = [self._get_column(c) for c in include] + include = [] + # Columns that are inferred from properties (Labels) should be included + # last for the following joins to work properly + for col in all_includes: + if isinstance(col, Label): + include.append(col) + else: + include.insert(0, col) + + filters = dict((self._get_column(c), filters[c]) for c in filters) + sort = OrderedDict((self._get_column(c), sort[c]) for c in sort) + + return include, filters, sort + + def _get_join_models_list(self, include, filters, sort): + """Return a list of models on which the query should be joined, as + inferred from the include, filter and sort column names + """ + if not self.model_cls.is_resource: + return [] + + all_column_names = include + filters.keys() + sort.keys() + join_columns = set(column_name for column_name in all_column_names + if self._is_join_column(column_name)) + + # If the only columns included are the columns on which we would + # normally join, there isn't actually a need to join, as the FROM + # clause in the query will be generated from the relevant models anyway + if include == list(join_columns): + return [] + + # Initializing a set, because the same model can appear in several + # join lists + join_models = set() + for column_name in join_columns: + join_models.update( + self.model_cls.join_properties[column_name]['models'] + ) + # Sort the models by their correct join order + join_models = sorted(join_models, + key=lambda model: model.join_order, reverse=True) + + return join_models + + def _is_join_column(self, column_name): + """Return False if the column name corresponds to a regular SQLA + column that `model_class` has. + Return True if the column that should be used is a join column (see + SQLModelBase for an explanation) + """ + return self.model_cls.is_resource and \ + column_name in self.model_cls.join_properties + + def _get_column(self, column_name): + """Return the column on which an action (filtering, sorting, etc.) + would need to be performed. Can be either an attribute of the class, + or needs to be inferred from the class' `join_properties` property + """ + if self._is_join_column(column_name): + return self.model_cls.join_properties[column_name]['column'] + else: + return getattr(self.model_cls, column_name) + + # TODO is this really needed in aria? + @staticmethod + def _paginate(query, pagination): + """Paginate the query by size and offset + + :param query: Current SQLAlchemy query object + :param pagination: An optional dict with size and offset keys + :return: A tuple with four elements: + - results: `size` items starting from `offset` + - the total count of items + - `size` [default: 0] + - `offset` [default: 0] + """ + if pagination: + size = pagination.get('size', 0) + offset = pagination.get('offset', 0) + total = query.order_by(None).count() # Fastest way to count + results = query.limit(size).offset(offset).all() + return results, total, size, offset + else: + results = query.all() + return results, len(results), 0, 0 + + @staticmethod + def _load_properties(instance): + """A helper method used to overcome a problem where the properties + that rely on joins aren't being loaded automatically + """ + if instance.is_resource: + for prop in instance.join_properties: + getattr(instance, prop) + + +class ListResult(object): + """ + a ListResult contains results about the requested items. + """ + def __init__(self, items, metadata): + self.items = items + self.metadata = metadata + + def __len__(self): + return len(self.items) + + def __iter__(self): + return iter(self.items) + + def __getitem__(self, item): + return self.items[item]
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index d24ad75..c04f7d8 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -36,14 +36,27 @@ classes: * ProviderContext - provider context implementation model. * Plugin - plugin implementation model. """ - from datetime import datetime -from types import NoneType - -from .structures import Field, IterPointerField, Model, uuid_generator, PointerField +from uuid import uuid4 + +from .structures import ( + SQLModelBase, + Column, + Integer, + Text, + DateTime, + Boolean, + Enum, + String, + PickleType, + Float, + MutableDict, + Dict, + foreign_key, + one_to_many_relationship +) __all__ = ( - 'Model', 'Blueprint', 'Snapshot', 'Deployment', @@ -60,146 +73,111 @@ __all__ = ( ) # todo: sort this, maybe move from mgr or move from aria??? -ACTION_TYPES = () -ENTITY_TYPES = () +# TODO: this must change +ACTION_TYPES = ('a') +ENTITY_TYPES = ('b') + + +def uuid_generator(): + """ + wrapper function which generates ids + """ + return str(uuid4()) -class Blueprint(Model): +class Blueprint(SQLModelBase): """ - A Model which represents a blueprint + Blueprint model representation. """ - plan = Field(type=dict) - id = Field(type=basestring, default=uuid_generator) - description = Field(type=(basestring, NoneType)) - created_at = Field(type=datetime) - updated_at = Field(type=datetime) - main_file_name = Field(type=basestring) + __tablename__ = 'blueprints' + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + created_at = Column(DateTime, nullable=False, index=True) + main_file_name = Column(Text, nullable=False) + plan = Column(MutableDict.as_mutable(Dict), nullable=False) + updated_at = Column(DateTime) + description = Column(Text) -class Snapshot(Model): + +class Snapshot(SQLModelBase): """ - A Model which represents a snapshot + Snapshot model representation. """ + __tablename__ = 'snapshots' + CREATED = 'created' FAILED = 'failed' CREATING = 'creating' UPLOADED = 'uploaded' + + STATES = [CREATED, FAILED, CREATING, UPLOADED] END_STATES = [CREATED, FAILED, UPLOADED] - id = Field(type=basestring, default=uuid_generator) - created_at = Field(type=datetime) - status = Field(type=basestring) - error = Field(type=basestring, default=None) + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + created_at = Column(DateTime, nullable=False, index=True) + status = Column(Enum(*STATES, name='snapshot_status')) + error = Column(Text) -class Deployment(Model): - """ - A Model which represents a deployment - """ - id = Field(type=basestring, default=uuid_generator) - description = Field(type=(basestring, NoneType)) - created_at = Field(type=datetime) - updated_at = Field(type=datetime) - blueprint_id = Field(type=basestring) - workflows = Field(type=dict) - inputs = Field(type=dict, default=lambda: {}) - policy_types = Field(type=dict, default=lambda: {}) - policy_triggers = Field(type=dict, default=lambda: {}) - groups = Field(type=dict, default=lambda: {}) - outputs = Field(type=dict, default=lambda: {}) - scaling_groups = Field(type=dict, default=lambda: {}) - - -class DeploymentUpdateStep(Model): + +class Deployment(SQLModelBase): """ - A Model which represents a deployment update step + Deployment model representation. """ - id = Field(type=basestring, default=uuid_generator) - action = Field(type=basestring, choices=ACTION_TYPES) - entity_type = Field(type=basestring, choices=ENTITY_TYPES) - entity_id = Field(type=basestring) - supported = Field(type=bool, default=True) - - def __hash__(self): - return hash((self.id, self.entity_id)) + __tablename__ = 'deployments' + + # See base class for an explanation on these properties + join_properties = { + 'blueprint_id': { + # No need to provide the Blueprint table, as it's already joined + 'models': [Blueprint], + 'column': Blueprint.id.label('blueprint_id') + }, + } + join_order = 2 + + _private_fields = ['blueprint_storage_id'] + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + created_at = Column(DateTime, nullable=False, index=True) + description = Column(Text) + inputs = Column(MutableDict.as_mutable(Dict)) + groups = Column(MutableDict.as_mutable(Dict)) + permalink = Column(Text) + policy_triggers = Column(MutableDict.as_mutable(Dict)) + policy_types = Column(MutableDict.as_mutable(Dict)) + outputs = Column(MutableDict.as_mutable(Dict)) + scaling_groups = Column(MutableDict.as_mutable(Dict)) + updated_at = Column(DateTime) + workflows = Column(MutableDict.as_mutable(Dict)) + + blueprint_storage_id = foreign_key(Blueprint) + blueprint = one_to_many_relationship( + child_class_name='Deployment', + column_name='blueprint_storage_id', + parent_class_name='Blueprint', + back_reference_name='deployments' + ) - def __lt__(self, other): + @property + def blueprint_id(self): """ - the order is 'remove' < 'modify' < 'add' - :param other: + Returns the blueprint is :return: """ - if not isinstance(other, self.__class__): - return not self >= other - - if self.action != other.action: - if self.action == 'remove': - return_value = True - elif self.action == 'add': - return_value = False - else: - return_value = other.action == 'add' - return return_value - - if self.action == 'add': - return self.entity_type == 'node' and other.entity_type == 'relationship' - if self.action == 'remove': - return self.entity_type == 'relationship' and other.entity_type == 'node' - return False - - -class DeploymentUpdate(Model): - """ - A Model which represents a deployment update - """ - INITIALIZING = 'initializing' - SUCCESSFUL = 'successful' - UPDATING = 'updating' - FINALIZING = 'finalizing' - EXECUTING_WORKFLOW = 'executing_workflow' - FAILED = 'failed' + return self.blueprint.id - STATES = [ - INITIALIZING, - SUCCESSFUL, - UPDATING, - FINALIZING, - EXECUTING_WORKFLOW, - FAILED, - ] - - # '{0}-{1}'.format(kwargs['deployment_id'], uuid4()) - id = Field(type=basestring, default=uuid_generator) - deployment_id = Field(type=basestring) - state = Field(type=basestring, choices=STATES, default=INITIALIZING) - deployment_plan = Field() - deployment_update_nodes = Field(default=None) - deployment_update_node_instances = Field(default=None) - deployment_update_deployment = Field(default=None) - modified_entity_ids = Field(default=None) - execution_id = Field(type=basestring) - steps = IterPointerField(type=DeploymentUpdateStep, default=()) - - -class Execution(Model): + +class Execution(SQLModelBase): """ - A Model which represents an execution + Execution model representation. """ - - class _Validation(object): - - @staticmethod - def execution_status_transition_validation(_, value, instance): - """Validation function that verifies execution status transitions are OK""" - try: - current_status = instance.status - except AttributeError: - return - valid_transitions = Execution.VALID_TRANSITIONS.get(current_status, []) - if current_status != value and value not in valid_transitions: - raise ValueError('Cannot change execution status from {current} to {new}'.format( - current=current_status, - new=value)) + __tablename__ = 'executions' TERMINATED = 'terminated' FAILED = 'failed' @@ -207,206 +185,500 @@ class Execution(Model): PENDING = 'pending' STARTED = 'started' CANCELLING = 'cancelling' - STATES = ( - TERMINATED, - FAILED, - CANCELLED, - PENDING, - STARTED, - CANCELLING, - ) + FORCE_CANCELLING = 'force_cancelling' + + STATES = [TERMINATED, FAILED, CANCELLED, PENDING, STARTED, + CANCELLING, FORCE_CANCELLING] END_STATES = [TERMINATED, FAILED, CANCELLED] ACTIVE_STATES = [state for state in STATES if state not in END_STATES] - VALID_TRANSITIONS = { - PENDING: [STARTED, CANCELLED], - STARTED: END_STATES + [CANCELLING], - CANCELLING: END_STATES + + # See base class for an explanation on these properties + join_properties = { + 'blueprint_id': { + 'models': [Deployment, Blueprint], + 'column': Blueprint.id.label('blueprint_id') + }, + 'deployment_id': { + 'models': [Deployment], + 'column': Deployment.id.label('deployment_id') + } } + join_order = 3 + + _private_fields = ['deployment_storage_id'] + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + created_at = Column(DateTime, index=True) + started_at = Column(DateTime, nullable=True, index=True) + ended_at = Column(DateTime, nullable=True, index=True) + error = Column(Text, nullable=True) + is_system_workflow = Column(Boolean, nullable=False, default=False) + parameters = Column(MutableDict.as_mutable(Dict)) + status = Column(Enum(*STATES, name='execution_status')) + workflow_id = Column(Text, nullable=False) + + deployment_storage_id = foreign_key(Deployment, nullable=True) + deployment = one_to_many_relationship( + child_class_name='Execution', + column_name='deployment_storage_id', + parent_class_name='Deployment', + back_reference_name='executions' + ) + + @property + def deployment_id(self): + """ + Returns the deployment id + :return: + """ + return self.deployment.id if self.deployment else None - id = Field(type=basestring, default=uuid_generator) - status = Field(type=basestring, choices=STATES, - validation_func=_Validation.execution_status_transition_validation) - deployment_id = Field(type=basestring) - workflow_id = Field(type=basestring) - blueprint_id = Field(type=basestring) - created_at = Field(type=datetime, default=datetime.utcnow) - started_at = Field(type=datetime, default=None) - ended_at = Field(type=datetime, default=None) - error = Field(type=basestring, default=None) - parameters = Field() + @property + def blueprint_id(self): + """ + Returns the blueprint id + :return: + """ + return self.deployment.blueprint_id if self.deployment else None + def __str__(self): + id_name, id_value = self._get_unique_id() + return '<{0} {1}=`{2}` (status={3})>'.format( + self.__class__.__name__, + id_name, + id_value, + self.status + ) -class Relationship(Model): - """ - A Model which represents a relationship - """ - id = Field(type=basestring, default=uuid_generator) - source_id = Field(type=basestring) - target_id = Field(type=basestring) - source_interfaces = Field(type=dict) - source_operations = Field(type=dict) - target_interfaces = Field(type=dict) - target_operations = Field(type=dict) - type = Field(type=basestring) - type_hierarchy = Field(type=list) - properties = Field(type=dict) - - -class Node(Model): + +class DeploymentUpdate(SQLModelBase): """ - A Model which represents a node + Deployment update model representation. """ - id = Field(type=basestring, default=uuid_generator) - blueprint_id = Field(type=basestring) - type = Field(type=basestring) - type_hierarchy = Field() - number_of_instances = Field(type=int) - planned_number_of_instances = Field(type=int) - deploy_number_of_instances = Field(type=int) - host_id = Field(type=basestring, default=None) - properties = Field(type=dict) - operations = Field(type=dict) - plugins = Field(type=list, default=()) - relationships = IterPointerField(type=Relationship) - plugins_to_install = Field(type=list, default=()) - min_number_of_instances = Field(type=int) - max_number_of_instances = Field(type=int) - - def relationships_by_target(self, target_id): + __tablename__ = 'deployment_updates' + + # See base class for an explanation on these properties + join_properties = { + 'execution_id': { + 'models': [Execution], + 'column': Execution.id.label('execution_id') + }, + 'deployment_id': { + 'models': [Deployment], + 'column': Deployment.id.label('deployment_id') + }, + } + join_order = 4 + + _private_fields = ['execution_storage_id'] + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + created_at = Column(DateTime, nullable=False, index=True) + deployment_plan = Column(MutableDict.as_mutable(Dict)) + deployment_update_node_instances = Column(MutableDict.as_mutable( + Dict)) + deployment_update_deployment = Column(MutableDict.as_mutable(Dict)) + deployment_update_nodes = Column(MutableDict.as_mutable(Dict)) + modified_entity_ids = Column(MutableDict.as_mutable(Dict)) + state = Column(Text) + + execution_storage_id = foreign_key(Execution, nullable=True) + execution = one_to_many_relationship( + child_class_name='DeploymentUpdate', + column_name='execution_storage_id', + parent_class_name='Execution', + back_reference_name='deployment_updates' + ) + + deployment_storage_id = foreign_key(Deployment) + deployment = one_to_many_relationship( + child_class_name='DeploymentUpdate', + column_name='deployment_storage_id', + parent_class_name='Deployment', + back_reference_name='deployment_updates' + ) + + @property + def execution_id(self): """ - Retreives all of the relationship by target. - :param target_id: the node id of the target of the relationship - :yields: a relationship which target and node with the specified target_id + Returns the execution id + :return: """ - for relationship in self.relationships: - if relationship.target_id == target_id: - yield relationship - # todo: maybe add here Exception if isn't exists (didn't yield one's) + return self.execution.id if self.execution else None + @property + def deployment_id(self): + """ + Rerturns the deployment id + :return: + """ + return self.deployment.id -class RelationshipInstance(Model): - """ - A Model which represents a relationship instance - """ - id = Field(type=basestring, default=uuid_generator) - target_id = Field(type=basestring) - target_name = Field(type=basestring) - source_id = Field(type=basestring) - source_name = Field(type=basestring) - type = Field(type=basestring) - relationship = PointerField(type=Relationship) + def to_dict(self, suppress_error=False, **kwargs): + dep_update_dict = super(DeploymentUpdate, self).to_dict(suppress_error) + # Taking care of the fact the DeploymentSteps are objects + dep_update_dict['steps'] = [step.to_dict() for step in self.steps] + return dep_update_dict -class NodeInstance(Model): +class DeploymentUpdateStep(SQLModelBase): """ - A Model which represents a node instance + Deployment update step model representation. """ - # todo: add statuses - UNINITIALIZED = 'uninitialized' - INITIALIZING = 'initializing' - CREATING = 'creating' - CONFIGURING = 'configuring' - STARTING = 'starting' - DELETED = 'deleted' - STOPPING = 'stopping' - DELETING = 'deleting' - STATES = ( - UNINITIALIZED, - INITIALIZING, - CREATING, - CONFIGURING, - STARTING, - DELETED, - STOPPING, - DELETING + __tablename__ = 'deployment_update_steps' + + # See base class for an explanation on these properties + join_properties = { + 'deployment_update_id': { + 'models': [DeploymentUpdate], + 'column': DeploymentUpdate.id.label('deployment_update_id') + }, + } + join_order = 5 + + _private_fields = ['deployment_update_storage_id'] + + id = Column(Integer, primary_key=True, autoincrement=True) + + action = Column(Enum(*ACTION_TYPES, name='action_type')) + entity_id = Column(Text, nullable=False) + entity_type = Column(Enum(*ENTITY_TYPES, name='entity_type')) + + deployment_update_storage_id = foreign_key(DeploymentUpdate) + deployment_update = one_to_many_relationship( + child_class_name='DeploymentUpdateStep', + column_name='deployment_update_storage_id', + parent_class_name='DeploymentUpdate', + back_reference_name='steps' ) - id = Field(type=basestring, default=uuid_generator) - deployment_id = Field(type=basestring) - runtime_properties = Field(type=dict) - state = Field(type=basestring, choices=STATES, default=UNINITIALIZED) - version = Field(type=(basestring, NoneType)) - relationship_instances = IterPointerField(type=RelationshipInstance) - node = PointerField(type=Node) - host_id = Field(type=basestring, default=None) - scaling_groups = Field(default=()) - - def relationships_by_target(self, target_id): + @property + def deployment_update_id(self): """ - Retreives all of the relationship by target. - :param target_id: the instance id of the target of the relationship - :yields: a relationship instance which target and node with the specified target_id + Returns the deployment update id + :return: """ - for relationship_instance in self.relationship_instances: - if relationship_instance.target_id == target_id: - yield relationship_instance - # todo: maybe add here Exception if isn't exists (didn't yield one's) + return self.deployment_update.id -class DeploymentModification(Model): +class DeploymentModification(SQLModelBase): """ - A Model which represents a deployment modification + Deployment modification model representation. """ + __tablename__ = 'deployment_modifications' + STARTED = 'started' FINISHED = 'finished' ROLLEDBACK = 'rolledback' + + STATES = [STARTED, FINISHED, ROLLEDBACK] END_STATES = [FINISHED, ROLLEDBACK] - id = Field(type=basestring, default=uuid_generator) - deployment_id = Field(type=basestring) - modified_nodes = Field(type=(dict, NoneType)) - added_and_related = IterPointerField(type=NodeInstance) - removed_and_related = IterPointerField(type=NodeInstance) - extended_and_related = IterPointerField(type=NodeInstance) - reduced_and_related = IterPointerField(type=NodeInstance) - # before_modification = IterPointerField(type=NodeInstance) - status = Field(type=basestring, choices=(STARTED, FINISHED, ROLLEDBACK)) - created_at = Field(type=datetime) - ended_at = Field(type=(datetime, NoneType)) - context = Field() - - -class ProviderContext(Model): + # See base class for an explanation on these properties + join_properties = { + 'deployment_id': { + 'models': [Deployment], + 'column': Deployment.id.label('deployment_id') + }, + } + join_order = 3 + + _private_fields = ['deployment_storage_id'] + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + context = Column(MutableDict.as_mutable(Dict)) + created_at = Column(DateTime, nullable=False, index=True) + ended_at = Column(DateTime, index=True) + modified_nodes = Column(MutableDict.as_mutable(Dict)) + node_instances = Column(MutableDict.as_mutable(Dict)) + status = Column( + Enum(*STATES, name='deployment_modification_status')) + + deployment_storage_id = foreign_key(Deployment) + deployment = one_to_many_relationship( + child_class_name='DeploymentModification', + column_name='deployment_storage_id', + parent_class_name='Deployment', + back_reference_name='modifications' + ) + + @property + def deployment_id(self): + """ + Returns the deployment id + :return: + """ + return self.deployment.id + + +class Node(SQLModelBase): + """ + Node model representation. + """ + __tablename__ = 'nodes' + + # See base class for an explanation on these properties + is_id_unique = False + join_properties = { + 'blueprint_id': { + 'models': [Deployment, Blueprint], + 'column': Blueprint.id.label('blueprint_id') + }, + 'deployment_id': { + 'models': [Deployment], + 'column': Deployment.id.label('deployment_id') + }, + } + join_order = 3 + + _private_fields = ['deployment_storage_id'] + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + deploy_number_of_instances = Column(Integer, nullable=False) + # TODO: This probably should be a foreign key, but there's no guarantee + # in the code, currently, that the host will be created beforehand + host_id = Column(Text) + max_number_of_instances = Column(Integer, nullable=False) + min_number_of_instances = Column(Integer, nullable=False) + number_of_instances = Column(Integer, nullable=False) + planned_number_of_instances = Column(Integer, nullable=False) + plugins = Column(MutableDict.as_mutable(Dict)) + plugins_to_install = Column(MutableDict.as_mutable(Dict)) + properties = Column(MutableDict.as_mutable(Dict)) + operations = Column(MutableDict.as_mutable(Dict)) + type = Column(Text, nullable=False, index=True) + type_hierarchy = Column(PickleType) + + deployment_storage_id = foreign_key(Deployment) + deployment = one_to_many_relationship( + child_class_name='Node', + column_name='deployment_storage_id', + parent_class_name='Deployment', + back_reference_name='nodes' + ) + + @property + def deployment_id(self): + """ + Returns the deployment id + :return: + """ + return self.deployment.id + + @property + def blueprint_id(self): + """ + Returns the blueprint id + :return: + """ + return self.deployment.blueprint_id + + +class Relationship(SQLModelBase): + """ + Relationship model representation. + """ + __tablename__ = 'relationships' + + join_properties = { + 'blueprint_id': { + 'models': [Node, Deployment, Blueprint], + 'column': Blueprint.id.label('blueprint_id') + }, + 'deployment_id': { + 'models': [Node, Deployment], + 'column': Deployment.id.label('deployment_id') + } + } + join_order = 4 + _private_fields = ['relationship_storage_source_node_id', + 'relationship_storage_target_node_id'] + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + source_interfaces = Column(MutableDict.as_mutable(Dict)) + source_operations = Column(MutableDict.as_mutable(Dict)) + target_interfaces = Column(MutableDict.as_mutable(Dict)) + target_operations = Column(MutableDict.as_mutable(Dict)) + type = Column(String) + type_hierarchy = Column(PickleType) # TODO: this should be list + properties = Column(MutableDict.as_mutable(Dict)) + + source_node_storage_id = foreign_key(Node) + target_node_storage_id = foreign_key(Node) + + source_node = one_to_many_relationship( + child_class_name='Relationship', + column_name='source_node_storage_id', + parent_class_name='Node', + back_reference_name='relationship_source' + ) + target_node = one_to_many_relationship( + child_class_name='Relationship', + column_name='target_node_storage_id', + parent_class_name='Node', + back_reference_name='relationship_target' + ) + + +class NodeInstance(SQLModelBase): + """ + Node instance model representation. + """ + __tablename__ = 'node_instances' + + # See base class for an explanation on these properties + join_properties = { + 'node_id': { + 'models': [Node], + 'column': Node.id.label('node_id') + }, + 'deployment_id': { + 'models': [Node, Deployment], + 'column': Deployment.id.label('deployment_id') + }, + } + join_order = 4 + + _private_fields = ['node_storage_id', 'deployment_storage_id'] + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + # TODO: This probably should be a foreign key, but there's no guarantee + # in the code, currently, that the host will be created beforehand + host_id = Column(Text) + runtime_properties = Column(MutableDict.as_mutable(Dict)) + scaling_groups = Column(MutableDict.as_mutable(Dict)) + state = Column(Text, nullable=False) + version = Column(Integer, default=1) + + node_storage_id = foreign_key(Node) + node = one_to_many_relationship( + child_class_name='NodeInstance', + column_name='node_storage_id', + parent_class_name='Node', + back_reference_name='node_instances' + ) + + @property + def node_id(self): + """ + Returns the node id + :return: + """ + return self.node.id + + deployment_storage_id = foreign_key(Deployment) + deployment = one_to_many_relationship( + child_class_name='NodeInstance', + column_name='deployment_storage_id', + parent_class_name='Deployment', + back_reference_name='node_instances' + ) + + +class RelationshipInstance(SQLModelBase): + """ + Relationship instance model representation. + """ + __tablename__ = 'relationship_instances' + + join_properties = { + 'blueprint_id': { + 'models': [Relationship, Node, Deployment, Blueprint], + 'column': Blueprint.id.label('blueprint_id') + }, + 'deployment_id': { + 'models': [Relationship, Node, Deployment], + 'column': Deployment.id.label('deployment_id') + } + } + join_order = 5 + + _private_fields = ['relationship_storage_id', + 'source_node_instance_id', + 'target_node_instance_id'] + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + type = Column(String) + + source_node_instance_storage_id = foreign_key(NodeInstance) + source_node_instance = one_to_many_relationship( + child_class_name='RelationshipInstance', + column_name='source_node_instance_storage_id', + parent_class_name='NodeInstance', + back_reference_name='relationship_instance_source' + ) + target_node_instance_storage_id = foreign_key(NodeInstance) + target_node_instance = one_to_many_relationship( + child_class_name='RelationshipInstance', + column_name='target_node_instance_storage_id', + parent_class_name='NodeInstance', + back_reference_name='relationship_instance_target' + ) + relationship_storage_id = foreign_key(Relationship) + relationship = one_to_many_relationship( + child_class_name='RelationshipInstance', + column_name='relationship_storage_id', + parent_class_name='Relationship', + back_reference_name='relationship_instances' + ) + + +class ProviderContext(SQLModelBase): """ - A Model which represents a provider context + Provider context model representation. """ - id = Field(type=basestring, default=uuid_generator) - context = Field(type=dict) - name = Field(type=basestring) + __tablename__ = 'provider_context' + + id = Column(Text, primary_key=True) + name = Column(Text, nullable=False) + context = Column(MutableDict.as_mutable(Dict), nullable=False) -class Plugin(Model): +class Plugin(SQLModelBase): """ - A Model which represents a plugin + Plugin model representation. """ - id = Field(type=basestring, default=uuid_generator) - package_name = Field(type=basestring) - archive_name = Field(type=basestring) - package_source = Field(type=dict) - package_version = Field(type=basestring) - supported_platform = Field(type=basestring) - distribution = Field(type=basestring) - distribution_version = Field(type=basestring) - distribution_release = Field(type=basestring) - wheels = Field() - excluded_wheels = Field() - supported_py_versions = Field(type=list) - uploaded_at = Field(type=datetime) - - -class Task(Model): + __tablename__ = 'plugins' + + storage_id = Column(Integer, primary_key=True, autoincrement=True) + id = Column(Text, index=True) + + archive_name = Column(Text, nullable=False, index=True) + distribution = Column(Text) + distribution_release = Column(Text) + distribution_version = Column(Text) + excluded_wheels = Column(MutableDict.as_mutable(Dict)) + package_name = Column(Text, nullable=False, index=True) + package_source = Column(Text) + package_version = Column(Text) + supported_platform = Column(MutableDict.as_mutable(Dict)) + supported_py_versions = Column(MutableDict.as_mutable(Dict)) + uploaded_at = Column(DateTime, nullable=False, index=True) + wheels = Column(MutableDict.as_mutable(Dict), nullable=False) + + +class Task(SQLModelBase): """ A Model which represents an task """ - class _Validation(object): + __tablename__ = 'task' - @staticmethod - def validate_max_attempts(_, value, *args): - """Validates that max attempts is either -1 or a positive number""" - if value < 1 and value != Task.INFINITE_RETRIES: - raise ValueError('Max attempts can be either -1 (infinite) or any positive number. ' - 'Got {value}'.format(value=value)) + _private_fields = ['node_instance_storage_id', 'relationship_instance_storage_id'] PENDING = 'pending' RETRYING = 'retrying' @@ -422,23 +694,75 @@ class Task(Model): SUCCESS, FAILED, ) + WAIT_STATES = [PENDING, RETRYING] END_STATES = [SUCCESS, FAILED] + + class _Validation(object): + + @staticmethod + def validate_max_attempts(_, value, *args): + """Validates that max attempts is either -1 or a positive number""" + if value < 1 and value != Task.INFINITE_RETRIES: + raise ValueError('Max attempts can be either -1 (infinite) or any positive number. ' + 'Got {value}'.format(value=value)) + INFINITE_RETRIES = -1 - id = Field(type=basestring, default=uuid_generator) - status = Field(type=basestring, choices=STATES, default=PENDING) - execution_id = Field(type=basestring) - due_at = Field(type=datetime, default=datetime.utcnow) - started_at = Field(type=datetime, default=None) - ended_at = Field(type=datetime, default=None) - max_attempts = Field(type=int, default=1, validation_func=_Validation.validate_max_attempts) - retry_count = Field(type=int, default=0) - retry_interval = Field(type=(int, float), default=0) - ignore_failure = Field(type=bool, default=False) + id = Column(String, primary_key=True, default=uuid_generator) + status = Column(Enum(*STATES), name='status', default=PENDING) + + execution_id = Column(String) + due_at = Column(DateTime, default=datetime.utcnow, nullable=True) + started_at = Column(DateTime, default=None, nullable=True) + ended_at = Column(DateTime, default=None, nullable=True) + # , validation_func=_Validation.validate_max_attempts) + max_attempts = Column(Integer, default=1) + retry_count = Column(Integer, default=0) + retry_interval = Column(Float, default=0) + ignore_failure = Column(Boolean, default=False) # Operation specific fields - name = Field(type=basestring) - operation_mapping = Field(type=basestring) - actor = Field() - inputs = Field(type=dict, default=lambda: {}) + name = Column(String) + operation_mapping = Column(String) + inputs = Column(MutableDict.as_mutable(Dict)) + + node_instance_storage_id = foreign_key(NodeInstance, nullable=True) + relationship_instance_storage_id = foreign_key(RelationshipInstance, nullable=True) + + node_instance = one_to_many_relationship( + child_class_name='Task', + column_name='node_instance_storage_id', + parent_class_name='NodeInstance', + back_reference_name='tasks', + ) + + relationship_instance = one_to_many_relationship( + child_class_name='Task', + column_name='relationship_instance_storage_id', + parent_class_name='RelationshipInstance', + back_reference_name='tasks', + ) + + @property + def actor_storage_id(self): + """ + Return the actor storage id of the task + :return: + """ + return self.node_instance_storage_id or self.relationship_instance_storage_id + + @property + def actor(self): + """ + Return the actor of the task + :return: + """ + return self.node_instance or self.relationship_instance + + def __init__(self, actor, **kwargs): + if isinstance(actor, RelationshipInstance): + kwargs['relationship_instance_storage_id'] = actor.storage_id + elif isinstance(actor, NodeInstance): + kwargs['node_instance_storage_id'] = actor.storage_id + super(Task, self).__init__(**kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/aria/storage/rapi/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/rapi/__init__.py b/aria/storage/rapi/__init__.py new file mode 100644 index 0000000..2217281 --- /dev/null +++ b/aria/storage/rapi/__init__.py @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +A collection of RAPIs +""" +from .filesystem import FileSystemResourceAPI http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/aria/storage/rapi/filesystem.py ---------------------------------------------------------------------- diff --git a/aria/storage/rapi/filesystem.py b/aria/storage/rapi/filesystem.py new file mode 100644 index 0000000..a6c4ddf --- /dev/null +++ b/aria/storage/rapi/filesystem.py @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQLalchemy based RAPI +""" +import os +import shutil +from distutils import dir_util +from functools import partial + +from aria.storage import ( + api, + filesystem_api, + exceptions +) + + +class FileSystemResourceAPI(api.ResourceAPI, filesystem_api.BaseFileSystemAPI): + """ + File system resource storage. + """ + + def __init__(self, directory, **kwargs): + """ + File system implementation for storage api. + :param str directory: root dir for storage. + """ + super(FileSystemResourceAPI, self).__init__(**kwargs) + self.directory = directory + self.base_path = os.path.join(self.directory, self.name) + self._join_path = partial(os.path.join, self.base_path) + + def __repr__(self): + return '{cls.__name__}(directory={self.directory})'.format( + cls=self.__class__, self=self) + + def create(self, **kwargs): + """ + Create directory in storage by path. + tries to create the root directory as well. + :param str name: path of file in storage. + """ + try: + os.makedirs(self.directory) + except (OSError, IOError): + pass + os.makedirs(self.base_path) + + def data(self, entry_id, path=None, **_): + """ + Retrieve the content of a file system storage resource. + + :param str entry_type: the type of the entry. + :param str entry_id: the id of the entry. + :param str path: a path to a specific resource. + :return: the content of the file + :rtype: bytes + """ + resource_relative_path = os.path.join(self.name, entry_id, path or '') + resource = os.path.join(self.directory, resource_relative_path) + if not os.path.exists(resource): + raise exceptions.StorageError("Resource {0} does not exist". + format(resource_relative_path)) + if not os.path.isfile(resource): + resources = os.listdir(resource) + if len(resources) != 1: + raise exceptions.StorageError('No resource in path: {0}'.format(resource)) + resource = os.path.join(resource, resources[0]) + with open(resource, 'rb') as resource_file: + return resource_file.read() + + def download(self, entry_id, destination, path=None, **_): + """ + Download a specific file or dir from the file system resource storage. + + :param str entry_type: the name of the entry. + :param str entry_id: the id of the entry + :param str destination: the destination of the files. + :param str path: a path on the remote machine relative to the root of the entry. + """ + resource_relative_path = os.path.join(self.name, entry_id, path or '') + resource = os.path.join(self.directory, resource_relative_path) + if not os.path.exists(resource): + raise exceptions.StorageError("Resource {0} does not exist". + format(resource_relative_path)) + if os.path.isfile(resource): + shutil.copy2(resource, destination) + else: + dir_util.copy_tree(resource, destination) # pylint: disable=no-member + + def upload(self, entry_id, source, path=None, **_): + """ + Uploads a specific file or dir to the file system resource storage. + + :param str entry_type: the name of the entry. + :param str entry_id: the id of the entry + :param source: the source of the files to upload. + :param path: the destination of the file/s relative to the entry root dir. + """ + resource_directory = os.path.join(self.directory, self.name, entry_id) + if not os.path.exists(resource_directory): + os.makedirs(resource_directory) + destination = os.path.join(resource_directory, path or '') + if os.path.isfile(source): + shutil.copy2(source, destination) + else: + dir_util.copy_tree(source, destination) # pylint: disable=no-member http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index b02366e..b8b74fa 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -27,281 +27,237 @@ classes: * Model - abstract model implementation. """ import json -from itertools import count -from uuid import uuid4 - -from .exceptions import StorageError -from ..logger import LoggerMixin -from ..utils.validation import ValidatorMixin - -__all__ = ( - 'uuid_generator', - 'Field', - 'IterField', - 'PointerField', - 'IterPointerField', - 'Model', - 'Storage', + +import jsonpickle +from sqlalchemy import VARCHAR +from sqlalchemy.ext.mutable import Mutable +from sqlalchemy.ext.declarative import declarative_base +# pylint: disable=unused-import +from sqlalchemy import ( + schema, + Column, + Integer, + Text, + DateTime, + Boolean, + Enum, + String, + PickleType, + Float, + TypeDecorator, + ForeignKey, + orm, ) -def uuid_generator(): - """ - wrapper function which generates ids - """ - return str(uuid4()) +Model = declarative_base() +class classproperty(object): + """A class that acts a a decorator for class-level properties -class Field(ValidatorMixin): - """ - A single field implementation - """ - NO_DEFAULT = 'NO_DEFAULT' - - try: - # python 3 syntax - _next_id = count().__next__ - except AttributeError: - # python 2 syntax - _next_id = count().next - _ATTRIBUTE_NAME = '_cache_{0}'.format - - def __init__( - self, - type=None, - choices=(), - validation_func=None, - default=NO_DEFAULT, - **kwargs): - """ - Simple field manager. + class A(object): + _prop1 = 1 + _prop2 = 2 - :param type: possible type of the field. - :param choices: a set of possible field values. - :param default: default field value. - :param kwargs: kwargs to be passed to next in line classes. - """ - self.type = type - self.choices = choices - self.default = default - self.validation_func = validation_func - super(Field, self).__init__(**kwargs) - - def __get__(self, instance, owner): - if instance is None: - return self - field_name = self._field_name(instance) - try: - return getattr(instance, self._ATTRIBUTE_NAME(field_name)) - except AttributeError as exc: - if self.default == self.NO_DEFAULT: - raise AttributeError( - str(exc).replace(self._ATTRIBUTE_NAME(field_name), field_name)) - - default_value = self.default() if callable(self.default) else self.default - setattr(instance, self._ATTRIBUTE_NAME(field_name), default_value) - return default_value - - def __set__(self, instance, value): - field_name = self._field_name(instance) - self.validate_value(field_name, value, instance) - setattr(instance, self._ATTRIBUTE_NAME(field_name), value) - - def validate_value(self, name, value, instance): - """ - Validates the value of the field. + @classproperty + def foo(cls): + return cls._prop1 + cls._prop2 - :param name: the name of the field. - :param value: the value of the field. - :param instance: the instance containing the field. - """ - if self.default != self.NO_DEFAULT and value == self.default: - return - if self.type: - self.validate_instance(name, value, self.type) - if self.choices: - self.validate_in_choice(name, value, self.choices) - if self.validation_func: - self.validation_func(name, value, instance) - - def _field_name(self, instance): - """ - retrieves the field name from the instance. + And use it like this: + print A.foo # 3 - :param Field instance: the instance which holds the field. - :return: name of the field - :rtype: basestring - """ - for name, member in vars(instance.__class__).iteritems(): - if member is self: - return name + """ + def __init__(self, get_func): + self.get_func = get_func + + def __get__(self, owner_self, owner_cls): + return self.get_func(owner_cls) + + +def foreign_key( + parent_table, + id_col_name='storage_id', + nullable=False, + column_type=Integer +): + """Return a ForeignKey object with the relevant + + :param parent_table: SQL name of the parent table + :param id_col_name: Name of the parent table's ID column [default: `id`] + :param nullable: Should the column be allowed to remain empty + :param column_type: The type (integer/text/etc.) of the column + :return: + """ + return Column( + column_type, + ForeignKey( + '{0}.{1}'.format(parent_table.__tablename__, id_col_name), + ondelete='CASCADE' + ), + nullable=nullable + ) + + +def one_to_many_relationship( + child_class_name, + column_name, + parent_class_name, + back_reference_name, + parent_id_name='storage_id', +): + """Return a one-to-many SQL relationship object + Meant to be used from inside the *child* object + + :param child_class_name: Class name of the child table + :param column_name: Name of the column pointing to the parent table + :param parent_class_name: Class name of the parent table + :param back_reference_name: The name to give to the reference to the child + :param parent_id_name: Name of the parent table's ID column [default: `id`] + :return: + """ + return orm.relationship( + parent_class_name, + primaryjoin='{0}.{1} == {2}.{3}'.format( + child_class_name, + column_name, + parent_class_name, + parent_id_name + ), + # The following line make sure that when the *parent* is + # deleted, all its connected children are deleted as well + backref=orm.backref(back_reference_name, cascade='all') + ) + + +def many_to_many_relationship( + other_table_class_name, + connecting_table, + back_reference_name +): + """Return a many-to-many SQL relationship object + + :param other_table_class_name: The name of the table we're connecting to + :param connecting_table: The secondary table used in the relationship + :param back_reference_name: The name to give to the reference to the + current table from the other table + :return: + """ + return orm.relationship( + other_table_class_name, + secondary=connecting_table, + backref=orm.backref(back_reference_name, lazy='dynamic') + ) -class IterField(Field): +class Dict(TypeDecorator): """ - Represents an iterable field. + Dict representation of type. """ - def __init__(self, **kwargs): - """ - Simple iterable field manager. - This field type don't have choices option. - :param kwargs: kwargs to be passed to next in line classes. - """ - super(IterField, self).__init__(choices=(), **kwargs) + def process_literal_param(self, value, dialect): + pass - def validate_value(self, name, values, *args): - """ - Validates the value of each iterable value. + @property + def python_type(self): + return dict - :param name: the name of the field. - :param values: the values of the field. - """ - for value in values: - self.validate_instance(name, value, self.type) + impl = VARCHAR + def process_bind_param(self, value, dialect): + if value is not None: + value = json.dumps(value) + return value + + def process_result_value(self, value, dialect): + if value is not None: + value = json.loads(value) + return value -class PointerField(Field): - """ - A single pointer field implementation. - Any PointerField points via id to another document. +class MutableDict(Mutable, dict): """ + Enables tracking for dict values. + """ + @classmethod + def coerce(cls, key, value): + "Convert plain dictionaries to MutableDict." - def __init__(self, type, **kwargs): - assert issubclass(type, Model) - super(PointerField, self).__init__(type=type, **kwargs) + if not isinstance(value, MutableDict): + if isinstance(value, dict): + return MutableDict(value) + # this call will raise ValueError + return Mutable.coerce(key, value) + else: + return value -class IterPointerField(IterField, PointerField): - """ - An iterable pointers field. - - Any IterPointerField points via id to other documents. - """ - pass + def __setitem__(self, key, value): + "Detect dictionary set events and emit change events." + dict.__setitem__(self, key, value) + self.changed() -class Model(object): - """ - Base class for all of the storage models. - """ - id = None + def __delitem__(self, key): + "Detect dictionary del events and emit change events." - def __init__(self, **fields): - """ - Abstract class for any model in the storage. - The Initializer creates attributes according to the (keyword arguments) that given - Each value is validated according to the Field. - Each model has to have and ID Field. + dict.__delitem__(self, key) + self.changed() - :param fields: each item is validated and transformed into instance attributes. - """ - self._assert_model_have_id_field(**fields) - missing_fields, unexpected_fields = self._setup_fields(fields) - if missing_fields: - raise StorageError( - 'Model {name} got missing keyword arguments: {fields}'.format( - name=self.__class__.__name__, fields=missing_fields)) +class SQLModelBase(Model): + """Abstract base class for all SQL models that allows [de]serialization + """ + # SQLAlchemy syntax + __abstract__ = True - if unexpected_fields: - raise StorageError( - 'Model {name} got unexpected keyword arguments: {fields}'.format( - name=self.__class__.__name__, fields=unexpected_fields)) + # Indicates to the storage manager whether the table is a resource or not + is_resource = False - def __repr__(self): - return '{name}(fields={0})'.format(sorted(self.fields), name=self.__class__.__name__) + _private_fields = [] - def __eq__(self, other): - return ( - isinstance(other, self.__class__) and - self.fields_dict == other.fields_dict) + # Indicates whether the `id` column in this class should be unique + is_id_unique = True - @property - def fields(self): + def to_dict(self, **kwargs): """ - Iterates over the fields of the model. - :yields: the class's field name + Convert the model into dict + :return: """ - for name, field in vars(self.__class__).items(): - if isinstance(field, Field): - yield name + return dict((field, getattr(self, field)) for field in self.fields) - @property - def fields_dict(self): + def to_json(self): """ - Transforms the instance attributes into a dict. - - :return: all fields in dict format. - :rtype dict + Convert the model into json. + :return: """ - return dict((name, getattr(self, name)) for name in self.fields) + return jsonpickle.encode(self.to_dict(), unpicklable=False) - @property - def json(self): + @classproperty + def fields(cls): + """Return the list of field names for this table + + Mostly for backwards compatibility in the code (that uses `fields`) """ - Transform the dict of attributes into json - :return: + return cls.__table__.columns.keys() + + def _get_unique_id(self): + """A method to allow classes to override the default representation """ - return json.dumps(self.fields_dict) + return 'id', self.id - @classmethod - def _assert_model_have_id_field(cls, **fields_initializer_values): - if not getattr(cls, 'id', None): - raise StorageError('Model {cls.__name__} must have id field'.format(cls=cls)) - - if cls.id.default == cls.id.NO_DEFAULT and 'id' not in fields_initializer_values: - raise StorageError( - 'Model {cls.__name__} is missing required ' - 'keyword-only argument: "id"'.format(cls=cls)) - - def _setup_fields(self, input_fields): - missing = [] - for field_name in self.fields: - try: - field_obj = input_fields.pop(field_name) - setattr(self, field_name, field_obj) - except KeyError: - field = getattr(self.__class__, field_name) - if field.default == field.NO_DEFAULT: - missing.append(field_name) - - unexpected_fields = input_fields.keys() - return missing, unexpected_fields - - -class Storage(LoggerMixin): - """ - Represents the storage - """ - def __init__(self, driver, items=(), **kwargs): - super(Storage, self).__init__(**kwargs) - self.driver = driver - self.registered = {} - for item in items: - self.register(item) - self.logger.debug('{name} object is ready: {0!r}'.format( - self, name=self.__class__.__name__)) + def __str__(self): + id_name, id_value = self._get_unique_id() + return '<{0} {1}=`{2}`>'.format( + self.__class__.__name__, + id_name, + id_value + ) def __repr__(self): - return '{name}(driver={self.driver})'.format( - name=self.__class__.__name__, self=self) + return str(self) - def __getattr__(self, item): - try: - return self.registered[item] - except KeyError: - return super(Storage, self).__getattribute__(item) + def __unicode__(self): + return str(self) - def setup(self): - """ - Setup and create all storage items - """ - for name, api in self.registered.iteritems(): - try: - api.create() - self.logger.debug( - 'setup {name} in storage {self!r}'.format(name=name, self=self)) - except StorageError: - pass + def __eq__(self, other): + return isinstance(other, self.__class__) and self.to_dict() == other.to_dict() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/requirements.txt ---------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index e6d5393..7e87c67 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,3 +23,4 @@ Jinja2==2.8 shortuuid==0.4.3 CacheControl[filecache]==0.11.6 clint==0.5.1 +SQLAlchemy==1.1.4 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index 5fda07e..0d09bb1 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -13,21 +13,59 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest + + from aria import application_model_storage from aria.orchestrator import context +from aria.storage.mapi import SQLAlchemyModelAPI + +from tests.storage import get_sqlite_api_params from . import models -from ..storage import InMemoryModelDriver +@pytest.fixture def simple(**kwargs): - storage = application_model_storage(InMemoryModelDriver()) - storage.setup() - storage.blueprint.store(models.get_blueprint()) - storage.deployment.store(models.get_deployment()) + api_params = get_sqlite_api_params() + model_storage = application_model_storage(SQLAlchemyModelAPI, api_params=api_params) + model_storage.blueprint.store(models.get_blueprint()) + blueprint = model_storage.blueprint.get(models.BLUEPRINT_ID) + deployment = models.get_deployment(blueprint) + model_storage.deployment.store(deployment) + + ################################################################################# + # Creating a simple deployment with node -> node as a graph + + dependency_node = models.get_dependency_node(deployment) + model_storage.node.store(dependency_node) + storage_dependency_node = model_storage.node.get(dependency_node.id) + + dependency_node_instance = models.get_dependency_node_instance(storage_dependency_node) + model_storage.node_instance.store(dependency_node_instance) + storage_dependency_node_instance = model_storage.node_instance.get(dependency_node_instance.id) + + dependent_node = models.get_dependent_node(deployment) + model_storage.node.store(dependent_node) + storage_dependent_node = model_storage.node.get(dependent_node.id) + + dependent_node_instance = models.get_dependent_node_instance(storage_dependent_node) + model_storage.node_instance.store(dependent_node_instance) + storage_dependent_node_instance = model_storage.node_instance.get(dependent_node_instance.id) + + relationship = models.get_relationship(storage_dependent_node, storage_dependency_node) + model_storage.relationship.store(relationship) + storage_relationship = model_storage.relationship.get(relationship.id) + relationship_instance = models.get_relationship_instance( + relationship=storage_relationship, + target_instance=storage_dependency_node_instance, + source_instance=storage_dependent_node_instance + ) + model_storage.relationship_instance.store(relationship_instance) + final_kwargs = dict( name='simple_context', - model_storage=storage, + model_storage=model_storage, resource_storage=None, deployment_id=models.DEPLOYMENT_ID, workflow_id=models.WORKFLOW_ID, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 327b0b9..bdcbed9 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -30,13 +30,13 @@ DEPENDENCY_NODE_ID = 'dependency_node' DEPENDENCY_NODE_INSTANCE_ID = 'dependency_node_instance' DEPENDENT_NODE_ID = 'dependent_node' DEPENDENT_NODE_INSTANCE_ID = 'dependent_node_instance' +RELATIONSHIP_ID = 'relationship' +RELATIONSHIP_INSTANCE_ID = 'relationship_instance' - -def get_dependency_node(): +def get_dependency_node(deployment): return models.Node( id=DEPENDENCY_NODE_ID, host_id=DEPENDENCY_NODE_ID, - blueprint_id=BLUEPRINT_ID, type='test_node_type', type_hierarchy=[], number_of_instances=1, @@ -44,28 +44,30 @@ def get_dependency_node(): deploy_number_of_instances=1, properties={}, operations=dict((key, {}) for key in operations.NODE_OPERATIONS), - relationships=[], min_number_of_instances=1, max_number_of_instances=1, + deployment_storage_id=deployment.storage_id ) -def get_dependency_node_instance(dependency_node=None): +def get_dependency_node_instance(dependency_node): return models.NodeInstance( id=DEPENDENCY_NODE_INSTANCE_ID, host_id=DEPENDENCY_NODE_INSTANCE_ID, - deployment_id=DEPLOYMENT_ID, runtime_properties={'ip': '1.1.1.1'}, version=None, - relationship_instances=[], - node=dependency_node or get_dependency_node() + node_storage_id=dependency_node.storage_id, + deployment_storage_id=dependency_node.deployment.storage_id, + state='', + scaling_groups={} ) def get_relationship(source=None, target=None): return models.Relationship( - source_id=source.id if source is not None else DEPENDENT_NODE_ID, - target_id=target.id if target is not None else DEPENDENCY_NODE_ID, + id=RELATIONSHIP_ID, + source_node_storage_id=source.storage_id, + target_node_storage_id=target.storage_id, source_interfaces={}, source_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS), target_interfaces={}, @@ -76,23 +78,21 @@ def get_relationship(source=None, target=None): ) -def get_relationship_instance(source_instance=None, target_instance=None, relationship=None): +def get_relationship_instance(source_instance, target_instance, relationship): return models.RelationshipInstance( - target_id=target_instance.id if target_instance else DEPENDENCY_NODE_INSTANCE_ID, - target_name='test_target_name', - source_id=source_instance.id if source_instance else DEPENDENT_NODE_INSTANCE_ID, - source_name='test_source_name', + id=RELATIONSHIP_INSTANCE_ID, type='some_type', - relationship=relationship or get_relationship(target_instance.node - if target_instance else None) + relationship_storage_id=relationship.storage_id, + target_node_instance_storage_id=target_instance.storage_id, + source_node_instance_storage_id=source_instance.storage_id, ) -def get_dependent_node(relationship=None): +def get_dependent_node(deployment): return models.Node( id=DEPENDENT_NODE_ID, + deployment_storage_id=deployment.storage_id, host_id=DEPENDENT_NODE_ID, - blueprint_id=BLUEPRINT_ID, type='test_node_type', type_hierarchy=[], number_of_instances=1, @@ -100,21 +100,21 @@ def get_dependent_node(relationship=None): deploy_number_of_instances=1, properties={}, operations=dict((key, {}) for key in operations.NODE_OPERATIONS), - relationships=[relationship or get_relationship()], min_number_of_instances=1, max_number_of_instances=1, ) -def get_dependent_node_instance(relationship_instance=None, dependent_node=None): +def get_dependent_node_instance(dependent_node): return models.NodeInstance( id=DEPENDENT_NODE_INSTANCE_ID, host_id=DEPENDENT_NODE_INSTANCE_ID, - deployment_id=DEPLOYMENT_ID, runtime_properties={}, version=None, - relationship_instances=[relationship_instance or get_relationship_instance()], - node=dependent_node or get_dependency_node() + node_storage_id=dependent_node.storage_id, + deployment_storage_id=dependent_node.deployment.storage_id, + state='', + scaling_groups={} ) @@ -130,25 +130,31 @@ def get_blueprint(): ) -def get_execution(): +def get_execution(deployment): return models.Execution( id=EXECUTION_ID, + deployment_storage_id=deployment.storage_id, status=models.Execution.STARTED, - deployment_id=DEPLOYMENT_ID, workflow_id=WORKFLOW_ID, - blueprint_id=BLUEPRINT_ID, started_at=datetime.utcnow(), parameters=None ) -def get_deployment(): +def get_deployment(blueprint): now = datetime.utcnow() return models.Deployment( id=DEPLOYMENT_ID, - description=None, + blueprint_storage_id=blueprint.storage_id, + description='', created_at=now, updated_at=now, - blueprint_id=BLUEPRINT_ID, - workflows={} + workflows={}, + inputs={}, + groups={}, + permalink='', + policy_triggers={}, + policy_types={}, + outputs={}, + scaling_groups={}, ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 6b3e28d..ec13154 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -35,7 +35,9 @@ global_test_holder = {} @pytest.fixture def ctx(): - return mock.context.simple() + context = mock.context.simple() + yield context + context.model.drop() @pytest.fixture @@ -50,14 +52,13 @@ def executor(): def test_node_operation_task_execution(ctx, executor): operation_name = 'aria.interfaces.lifecycle.create' - node = mock.models.get_dependency_node() + node = ctx.model.node.get(mock.models.DEPENDENCY_NODE_ID) node.operations[operation_name] = { 'operation': op_path(my_operation, module_path=__name__) } - node_instance = mock.models.get_dependency_node_instance(node) - ctx.model.node.store(node) - ctx.model.node_instance.store(node_instance) + ctx.model.node.update(node) + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) inputs = {'putput': True} @@ -91,25 +92,18 @@ def test_node_operation_task_execution(ctx, executor): def test_relationship_operation_task_execution(ctx, executor): operation_name = 'aria.interfaces.relationship_lifecycle.postconfigure' - dependency_node = mock.models.get_dependency_node() - dependency_node_instance = mock.models.get_dependency_node_instance() - relationship = mock.models.get_relationship(target=dependency_node) + relationship = ctx.model.relationship.get(mock.models.RELATIONSHIP_ID) relationship.source_operations[operation_name] = { 'operation': op_path(my_operation, module_path=__name__) } - relationship_instance = mock.models.get_relationship_instance( - target_instance=dependency_node_instance, - relationship=relationship) - dependent_node = mock.models.get_dependent_node() - dependent_node_instance = mock.models.get_dependent_node_instance( - relationship_instance=relationship_instance, - dependent_node=dependency_node) - ctx.model.node.store(dependency_node) - ctx.model.node_instance.store(dependency_node_instance) - ctx.model.relationship.store(relationship) - ctx.model.relationship_instance.store(relationship_instance) - ctx.model.node.store(dependent_node) - ctx.model.node_instance.store(dependent_node_instance) + ctx.model.relationship.update(relationship) + relationship_instance = ctx.model.relationship_instance.get( + mock.models.RELATIONSHIP_INSTANCE_ID) + + dependency_node = ctx.model.node.get(mock.models.DEPENDENCY_NODE_ID) + dependency_node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + dependent_node = ctx.model.node.get(mock.models.DEPENDENT_NODE_ID) + dependent_node_instance = ctx.model.node_instance.get(mock.models.DEPENDENT_NODE_INSTANCE_ID) inputs = {'putput': True} http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/tests/orchestrator/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py index 547e62b..480f289 100644 --- a/tests/orchestrator/context/test_toolbelt.py +++ b/tests/orchestrator/context/test_toolbelt.py @@ -33,7 +33,9 @@ global_test_holder = {} @pytest.fixture def workflow_context(): - return mock.context.simple() + context = mock.context.simple() + yield context + context.model.drop() @pytest.fixture @@ -45,31 +47,23 @@ def executor(): result.close() -def _create_simple_model_in_storage(workflow_context): - dependency_node = mock.models.get_dependency_node() - dependency_node_instance = mock.models.get_dependency_node_instance( - dependency_node=dependency_node) - relationship = mock.models.get_relationship(target=dependency_node) - relationship_instance = mock.models.get_relationship_instance( - target_instance=dependency_node_instance, relationship=relationship) - dependent_node = mock.models.get_dependent_node() - dependent_node_instance = mock.models.get_dependent_node_instance( - relationship_instance=relationship_instance, dependent_node=dependency_node) - workflow_context.model.node.store(dependency_node) - workflow_context.model.node_instance.store(dependency_node_instance) - workflow_context.model.relationship.store(relationship) - workflow_context.model.relationship_instance.store(relationship_instance) - workflow_context.model.node.store(dependent_node) - workflow_context.model.node_instance.store(dependent_node_instance) - return dependency_node, dependency_node_instance, \ - dependent_node, dependent_node_instance, \ - relationship, relationship_instance +def _get_elements(workflow_context): + dependency_node = workflow_context.model.node.get(mock.models.DEPENDENCY_NODE_ID) + dependency_node_instance = workflow_context.model.node_instance.get( + mock.models.DEPENDENCY_NODE_INSTANCE_ID) + dependent_node = workflow_context.model.node.get(mock.models.DEPENDENT_NODE_ID) + dependent_node_instance = workflow_context.model.node_instance.get( + mock.models.DEPENDENT_NODE_INSTANCE_ID) + relationship = workflow_context.model.relationship.get(mock.models.RELATIONSHIP_ID) + relationship_instance = workflow_context.model.relationship_instance.get( + mock.models.RELATIONSHIP_INSTANCE_ID) + return dependency_node, dependency_node_instance, dependent_node, dependent_node_instance, \ + relationship, relationship_instance def test_host_ip(workflow_context, executor): operation_name = 'aria.interfaces.lifecycle.create' - dependency_node, dependency_node_instance, _, _, _, _ = \ - _create_simple_model_in_storage(workflow_context) + dependency_node, dependency_node_instance, _, _, _, _ = _get_elements(workflow_context) dependency_node.operations[operation_name] = { 'operation': op_path(host_ip, module_path=__name__) @@ -96,7 +90,7 @@ def test_host_ip(workflow_context, executor): def test_dependent_node_instances(workflow_context, executor): operation_name = 'aria.interfaces.lifecycle.create' dependency_node, dependency_node_instance, _, dependent_node_instance, _, _ = \ - _create_simple_model_in_storage(workflow_context) + _get_elements(workflow_context) dependency_node.operations[operation_name] = { 'operation': op_path(dependent_nodes, module_path=__name__) @@ -116,14 +110,14 @@ def test_dependent_node_instances(workflow_context, executor): execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) - assert list(global_test_holder.get('dependent_node_instances', [])) == \ - list([dependent_node_instance]) + assert global_test_holder.get('dependent_node_instances')[0].to_dict == \ + dependent_node_instance.to_dict def test_relationship_tool_belt(workflow_context, executor): operation_name = 'aria.interfaces.relationship_lifecycle.postconfigure' _, _, _, _, relationship, relationship_instance = \ - _create_simple_model_in_storage(workflow_context) + _get_elements(workflow_context) relationship.source_operations[operation_name] = { 'operation': op_path(relationship_operation, module_path=__name__) } @@ -152,6 +146,7 @@ def test_wrong_model_toolbelt(): with pytest.raises(RuntimeError): context.toolbelt(None) + @operation(toolbelt=True) def host_ip(toolbelt, **_): global_test_holder['host_ip'] = toolbelt.host_ip http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d768e61/tests/orchestrator/context/test_workflow.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_workflow.py b/tests/orchestrator/context/test_workflow.py index 258f0c5..4c4979f 100644 --- a/tests/orchestrator/context/test_workflow.py +++ b/tests/orchestrator/context/test_workflow.py @@ -19,9 +19,10 @@ import pytest from aria import application_model_storage from aria.orchestrator import context +from aria.storage.mapi.sql import SQLAlchemyModelAPI from tests.mock import models -from tests.storage import InMemoryModelDriver +from tests import storage as test_storage class TestWorkflowContext(object): @@ -57,8 +58,9 @@ class TestWorkflowContext(object): @pytest.fixture(scope='function') def storage(): - result = application_model_storage(InMemoryModelDriver()) - result.setup() + api_params = test_storage.get_sqlite_api_params() + result = application_model_storage(SQLAlchemyModelAPI, api_params=api_params) result.blueprint.store(models.get_blueprint()) - result.deployment.store(models.get_deployment()) + blueprint = result.blueprint.get(models.BLUEPRINT_ID) + result.deployment.store(models.get_deployment(blueprint)) return result