Storage is now sql based with SQLAlchemy based models
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/88bc5d18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/88bc5d18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/88bc5d18 Branch: refs/heads/ARIA-30-SQL-based-storage-implementation Commit: 88bc5d18037023eaa466a81cb883b3b14d44335e Parents: fe974e4 Author: mxmrlv <mxm...@gmail.com> Authored: Sun Nov 27 13:20:46 2016 +0200 Committer: mxmrlv <mxm...@gmail.com> Committed: Thu Dec 1 14:35:10 2016 +0200 ---------------------------------------------------------------------- aria/__init__.py | 43 +- aria/orchestrator/__init__.py | 4 +- aria/orchestrator/context/common.py | 2 +- aria/orchestrator/context/exceptions.py | 4 +- aria/orchestrator/context/operation.py | 8 +- aria/orchestrator/context/toolbelt.py | 13 +- aria/orchestrator/context/workflow.py | 20 +- aria/orchestrator/exceptions.py | 7 +- aria/orchestrator/workflows/api/task.py | 10 +- aria/orchestrator/workflows/builtin/heal.py | 25 +- aria/orchestrator/workflows/builtin/install.py | 7 +- .../orchestrator/workflows/builtin/uninstall.py | 5 +- .../orchestrator/workflows/builtin/workflows.py | 4 +- aria/orchestrator/workflows/core/task.py | 21 +- aria/storage/__init__.py | 379 ++------ aria/storage/api.py | 219 +++++ aria/storage/drivers.py | 416 --------- aria/storage/exceptions.py | 4 +- aria/storage/filesystem_api.py | 39 + aria/storage/mapi/__init__.py | 20 + aria/storage/mapi/filesystem.py | 118 +++ aria/storage/mapi/inmemory.py | 148 +++ aria/storage/mapi/sql.py | 369 ++++++++ aria/storage/models.py | 912 +++++++++++++------ aria/storage/rapi/__init__.py | 18 + aria/storage/rapi/filesystem.py | 119 +++ aria/storage/structures.py | 424 ++++----- requirements.txt | 1 + tests/mock/context.py | 50 +- tests/mock/models.py | 68 +- tests/orchestrator/context/test_operation.py | 36 +- tests/orchestrator/context/test_toolbelt.py | 47 +- tests/orchestrator/context/test_workflow.py | 10 +- tests/orchestrator/workflows/api/test_task.py | 68 +- .../orchestrator/workflows/builtin/__init__.py | 35 +- .../workflows/builtin/test_execute_operation.py | 11 +- .../orchestrator/workflows/builtin/test_heal.py | 18 +- .../workflows/builtin/test_install.py | 14 +- .../workflows/builtin/test_uninstall.py | 12 +- .../orchestrator/workflows/core/test_engine.py | 71 +- tests/orchestrator/workflows/core/test_task.py | 20 +- .../test_task_graph_into_exececution_graph.py | 10 +- tests/requirements.txt | 2 +- tests/storage/__init__.py | 38 +- tests/storage/test_drivers.py | 135 --- tests/storage/test_field.py | 124 --- tests/storage/test_model_storage.py | 167 ++-- tests/storage/test_models.py | 364 -------- tests/storage/test_models_api.py | 70 -- tests/storage/test_resource_storage.py | 57 +- 50 files changed, 2318 insertions(+), 2468 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 3f81f98..6e810f0 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -23,7 +23,6 @@ import pkgutil from .VERSION import version as __version__ from .orchestrator.decorators import workflow, operation -from .storage import ModelStorage, ResourceStorage, models, ModelDriver, ResourceDriver from . import ( utils, parser, @@ -58,37 +57,37 @@ def install_aria_extensions(): del sys.modules[module_name] -def application_model_storage(driver): +def application_model_storage(api, api_params=None): """ Initiate model storage for the supplied storage driver """ - - assert isinstance(driver, ModelDriver) - if driver not in _model_storage: - _model_storage[driver] = ModelStorage( - driver, model_classes=[ - models.Node, - models.NodeInstance, - models.Plugin, - models.Blueprint, - models.Snapshot, - models.Deployment, - models.DeploymentUpdate, - models.DeploymentModification, - models.Execution, - models.ProviderContext, - models.Task, - ]) - return _model_storage[driver] + models = [ + storage.models.Blueprint, + storage.models.Deployment, + storage.models.Node, + storage.models.NodeInstance, + storage.models.Relationship, + storage.models.RelationshipInstance, + storage.models.Plugin, + storage.models.Snapshot, + storage.models.DeploymentUpdate, + storage.models.DeploymentUpdateStep, + storage.models.DeploymentModification, + storage.models.Execution, + storage.models.ProviderContext, + storage.models.Task, + ] + # if api not in _model_storage: + _model_storage[api] = storage.ModelStorage(api, items=models, api_params=api_params or {}) + return _model_storage[api] def application_resource_storage(driver): """ Initiate resource storage for the supplied storage driver """ - assert isinstance(driver, ResourceDriver) if driver not in _resource_storage: - _resource_storage[driver] = ResourceStorage( + _resource_storage[driver] = storage.ResourceStorage( driver, resources=[ 'blueprint', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/__init__.py b/aria/orchestrator/__init__.py index a5aeec7..90d6442 100644 --- a/aria/orchestrator/__init__.py +++ b/aria/orchestrator/__init__.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Aria orchestrator +""" from .decorators import workflow, operation from . import ( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index f2bf83b..7b65e2b 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -79,7 +79,7 @@ class BaseContext(logger.LoggerMixin): """ The blueprint model """ - return self.model.blueprint.get(self.deployment.blueprint_id) + return self.deployment.blueprint @property def deployment(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/context/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/exceptions.py b/aria/orchestrator/context/exceptions.py index 6704bbc..fe762e1 100644 --- a/aria/orchestrator/context/exceptions.py +++ b/aria/orchestrator/context/exceptions.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Context based exceptions +""" from ..exceptions import OrchestratorError http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index bf3686d..f522111 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -84,7 +84,7 @@ class RelationshipOperationContext(BaseOperationContext): The source node :return: """ - return self.model.node.get(self.relationship.source_id) + return self.relationship.source_node @property def source_node_instance(self): @@ -92,7 +92,7 @@ class RelationshipOperationContext(BaseOperationContext): The source node instance :return: """ - return self.model.node_instance.get(self.relationship_instance.source_id) + return self.relationship_instance.source_node_instance @property def target_node(self): @@ -100,7 +100,7 @@ class RelationshipOperationContext(BaseOperationContext): The target node :return: """ - return self.model.node.get(self.relationship.target_id) + return self.relationship.target_node @property def target_node_instance(self): @@ -108,7 +108,7 @@ class RelationshipOperationContext(BaseOperationContext): The target node instance :return: """ - return self.model.node_instance.get(self._actor.target_id) + return self.relationship_instance.target_node_instance @property def relationship(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py index 0aad89c..ae0e1ff 100644 --- a/aria/orchestrator/context/toolbelt.py +++ b/aria/orchestrator/context/toolbelt.py @@ -33,13 +33,10 @@ class NodeToolBelt(object): :return: """ assert isinstance(self._op_context, operation.NodeOperationContext) - node_instances = self._op_context.model.node_instance.iter( - filters={'deployment_id': self._op_context.deployment.id} - ) - for node_instance in node_instances: - for relationship_instance in node_instance.relationship_instances: - if relationship_instance.target_id == self._op_context.node_instance.id: - yield node_instance + filters = {'target_node_instance_storage_id': self._op_context.node_instance.storage_id} + for relationship_instance in \ + self._op_context.model.relationship_instance.iter(filters=filters): + yield relationship_instance.source_node_instance @property def host_ip(self): @@ -48,7 +45,7 @@ class NodeToolBelt(object): :return: """ assert isinstance(self._op_context, operation.NodeOperationContext) - host_id = self._op_context._actor.host_id + host_id = self._op_context.node_instance.host_id host_instance = self._op_context.model.node_instance.get(host_id) return host_instance.runtime_properties.get('ip') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 3dc222b..8797271 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -19,6 +19,7 @@ Workflow and operation contexts import threading from contextlib import contextmanager +from datetime import datetime from aria import storage @@ -49,13 +50,14 @@ class WorkflowContext(BaseContext): def _create_execution(self): execution_cls = self.model.execution.model_cls + now = datetime.utcnow() execution = self.model.execution.model_cls( id=self._execution_id, - deployment_id=self.deployment.id, workflow_id=self._workflow_id, - blueprint_id=self.blueprint.id, + created_at=now, status=execution_cls.PENDING, parameters=self.parameters, + deployment_storage_id=self.deployment.storage_id ) self.model.execution.store(execution) @@ -64,19 +66,27 @@ class WorkflowContext(BaseContext): """ Iterator over nodes """ - return self.model.node.iter(filters={'blueprint_id': self.blueprint.id}) + return self.model.node.iter( + filters={ + 'deployment_storage_id': self.deployment.storage_id + } + ) @property def node_instances(self): """ Iterator over node instances """ - return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id}) + return self.model.node_instance.iter( + filters={ + 'deployment_storage_id': self.deployment.storage_id + } + ) class _CurrentContext(threading.local): """ - Provides thread-level context, which sugarcoats the task api. + Provides thread-level context, which sugarcoats the task mapi. """ def __init__(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py index 75b37cf..1a48194 100644 --- a/aria/orchestrator/exceptions.py +++ b/aria/orchestrator/exceptions.py @@ -12,9 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Orchestrator based exceptions +""" from aria.exceptions import AriaError class OrchestratorError(AriaError): + """ + Orchestrator based exception + """ pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 4d36725..358315c 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph """ from uuid import uuid4 -import aria +from aria import storage from ... import context from .. import exceptions @@ -75,8 +75,8 @@ class OperationTask(BaseTask): :param actor: the operation host on which this operation is registered. :param inputs: operation inputs. """ - assert isinstance(actor, (aria.storage.models.NodeInstance, - aria.storage.models.RelationshipInstance)) + assert isinstance(actor, (storage.models.NodeInstance, + storage.models.RelationshipInstance)) super(OperationTask, self).__init__() self.actor = actor self.name = '{name}.{actor.id}'.format(name=name, actor=actor) @@ -97,7 +97,7 @@ class OperationTask(BaseTask): :param instance: the node of which this operation belongs to. :param name: the name of the operation. """ - assert isinstance(instance, aria.storage.models.NodeInstance) + assert isinstance(instance, storage.models.NodeInstance) operation_details = instance.node.operations[name] operation_inputs = operation_details.get('inputs', {}) operation_inputs.update(inputs or {}) @@ -119,7 +119,7 @@ class OperationTask(BaseTask): with 'source_operations' and 'target_operations' :param inputs any additional inputs to the operation """ - assert isinstance(instance, aria.storage.models.RelationshipInstance) + assert isinstance(instance, storage.models.RelationshipInstance) if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: raise exceptions.TaskException('The operation end should be {0} or {1}'.format( cls.TARGET_OPERATION, cls.SOURCE_OPERATION http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py index dbfc14e..650e664 100644 --- a/aria/orchestrator/workflows/builtin/heal.py +++ b/aria/orchestrator/workflows/builtin/heal.py @@ -84,16 +84,19 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): # create dependencies between the node instance sub workflow for node_instance in failing_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.relationship_instances): - graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], - node_instance_sub_workflow) + for relationship_instance in reversed(node_instance.relationship_instance_source): + graph.add_dependency( + node_instance_sub_workflows[relationship_instance.target_node_instance.id], + node_instance_sub_workflow) # Add operations for intact nodes depending on a node instance belonging to node_instances for node_instance in targeted_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.relationship_instances): - target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) + for relationship_instance in reversed(node_instance.relationship_instance_source): + + target_node_instance = \ + ctx.model.node_instance.get(relationship_instance.target_node_instance.id) target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id] graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow) @@ -134,9 +137,10 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): # create dependencies between the node instance sub workflow for node_instance in failing_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - if node_instance.relationship_instances: - dependencies = [node_instance_sub_workflows[relationship_instance.target_id] - for relationship_instance in node_instance.relationship_instances] + if node_instance.relationship_instance_source: + dependencies = \ + [node_instance_sub_workflows[relationship_instance.target_node_instance.id] + for relationship_instance in node_instance.relationship_instance_source] graph.add_dependency(node_instance_sub_workflow, dependencies) # Add operations for intact nodes depending on a node instance @@ -144,8 +148,9 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): for node_instance in targeted_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in node_instance.relationship_instances: - target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) + for relationship_instance in node_instance.relationship_instance_source: + target_node_instance = ctx.model.node_instance.get( + relationship_instance.target_node_instance.id) target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id] graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py index 0ab3ad6..634811f 100644 --- a/aria/orchestrator/workflows/builtin/install.py +++ b/aria/orchestrator/workflows/builtin/install.py @@ -47,7 +47,8 @@ def install(ctx, graph, node_instances=(), node_instance_sub_workflows=None): # create dependencies between the node instance sub workflow for node_instance in node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - if node_instance.relationship_instances: - dependencies = [node_instance_sub_workflows[relationship_instance.target_id] - for relationship_instance in node_instance.relationship_instances] + if node_instance.relationship_instance_source: + dependencies = [ + node_instance_sub_workflows[relationship_instance.target_node_instance.id] + for relationship_instance in node_instance.relationship_instance_source] graph.add_dependency(node_instance_sub_workflow, dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py index f4e965c..80fdc4e 100644 --- a/aria/orchestrator/workflows/builtin/uninstall.py +++ b/aria/orchestrator/workflows/builtin/uninstall.py @@ -47,6 +47,7 @@ def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None): # create dependencies between the node instance sub workflow for node_instance in node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.relationship_instances): - graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], + for relationship_instance in reversed(node_instance.relationship_instance_source): + target_id = relationship_instance.target_node_instance.id + graph.add_dependency(node_instance_sub_workflows[target_id], node_instance_sub_workflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py index 0eb8c34..02bfaf1 100644 --- a/aria/orchestrator/workflows/builtin/workflows.py +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -179,8 +179,8 @@ def relationships_tasks(graph, operation_name, node_instance): :return: """ relationships_groups = groupby( - node_instance.relationship_instances, - key=lambda relationship_instance: relationship_instance.relationship.target_id) + node_instance.relationship_instance_source, + key=lambda relationship_instance: relationship_instance.target_node_instance.id) sub_tasks = [] for _, (_, relationship_group) in enumerate(relationships_groups): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index a583cfc..fd00307 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -107,6 +107,15 @@ class OperationTask(BaseTask): super(OperationTask, self).__init__(id=api_task.id, **kwargs) self._workflow_context = api_task._workflow_context task_model = api_task._workflow_context.model.task.model_cls + + if isinstance(api_task.actor, models.NodeInstance): + context_class = operation_context.NodeOperationContext + elif isinstance(api_task.actor, models.RelationshipInstance): + context_class = operation_context.RelationshipOperationContext + else: + raise RuntimeError('No operation context could be created for {0}' + .format(api_task.actor.model_cls)) + operation_task = task_model( id=api_task.id, name=api_task.name, @@ -117,21 +126,13 @@ class OperationTask(BaseTask): execution_id=self._workflow_context._execution_id, max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, - ignore_failure=api_task.ignore_failure + ignore_failure=api_task.ignore_failure, ) - - if isinstance(api_task.actor, models.NodeInstance): - context_class = operation_context.NodeOperationContext - elif isinstance(api_task.actor, models.RelationshipInstance): - context_class = operation_context.RelationshipOperationContext - else: - raise RuntimeError('No operation context could be created for {0}' - .format(api_task.actor.model_cls)) + self._workflow_context.model.task.store(operation_task) self._ctx = context_class(name=api_task.name, workflow_context=self._workflow_context, task=operation_task) - self._workflow_context.model.task.store(operation_task) self._task_id = operation_task.id self._update_fields = None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index 2d142a5..6740cd0 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -20,14 +20,14 @@ Path: aria.storage Storage package is a generic abstraction over different storage types. We define this abstraction with the following components: -1. storage: simple api to use -2. driver: implementation of the database client api. +1. storage: simple mapi to use +2. driver: implementation of the database client mapi. 3. model: defines the structure of the table/document. 4. field: defines a field/item in the model. API: * application_storage_factory - function, default Aria storage factory. - * Storage - class, simple storage api. + * Storage - class, simple storage mapi. * models - module, default Aria standard models. * structures - module, default Aria structures - holds the base model, and different fields types. @@ -37,354 +37,93 @@ API: * drivers - module, a pool of Aria standard drivers. * StorageDriver - class, abstract model implementation. """ -# todo: rewrite the above package documentation -# (something like explaning the two types of storage - models and resources) -from collections import namedtuple - -from .structures import Storage, Field, Model, IterField, PointerField -from .drivers import ( - ModelDriver, - ResourceDriver, - FileSystemResourceDriver, - FileSystemModelDriver, +from aria.logger import LoggerMixin +from . import ( + models, + exceptions, + api as storage_api, + structures ) -from . import models, exceptions + __all__ = ( 'ModelStorage', - 'ResourceStorage', - 'FileSystemModelDriver', 'models', 'structures', - 'Field', - 'IterField', - 'PointerField', - 'Model', - 'drivers', - 'ModelDriver', - 'ResourceDriver', - 'FileSystemResourceDriver', ) -# todo: think about package output api's... -# todo: in all drivers name => entry_type -# todo: change in documentation str => basestring -class ModelStorage(Storage): +class Storage(LoggerMixin): """ - Managing the models storage. + Represents the storage """ - def __init__(self, driver, model_classes=(), **kwargs): - """ - Simple storage client api for Aria applications. - The storage instance defines the tables/documents/code api. - - :param ModelDriver driver: model storage driver. - :param model_classes: the models to register. - """ - assert isinstance(driver, ModelDriver) - super(ModelStorage, self).__init__(driver, model_classes, **kwargs) - - def __getattr__(self, table): - """ - getattr is a shortcut to simple api - - for Example: - >> storage = ModelStorage(driver=FileSystemModelDriver('/tmp')) - >> node_table = storage.node - >> for node in node_table: - >> print node - - :param str table: table name to get - :return: a storage object that mapped to the table name - """ - return super(ModelStorage, self).__getattr__(table) - - def register(self, model_cls): - """ - Registers the model type in the resource storage manager. - :param model_cls: the model to register. - """ - model_name = generate_lower_name(model_cls) - model_api = _ModelApi(model_name, self.driver, model_cls) - self.registered[model_name] = model_api - - for pointer_schema_register in model_api.pointer_mapping.values(): - model_cls = pointer_schema_register.model_cls - self.register(model_cls) - -_Pointer = namedtuple('_Pointer', 'name, is_iter') - - -class _ModelApi(object): - def __init__(self, name, driver, model_cls): - """ - Managing the model in the storage, using the driver. - - :param basestring name: the name of the model. - :param ModelDriver driver: the driver which supports this model in the storage. - :param Model model_cls: table/document class model. - """ - assert isinstance(driver, ModelDriver) - assert issubclass(model_cls, Model) - self.name = name - self.driver = driver - self.model_cls = model_cls - self.pointer_mapping = {} - self._setup_pointers_mapping() - - def _setup_pointers_mapping(self): - for field_name, field_cls in vars(self.model_cls).items(): - if not(isinstance(field_cls, PointerField) and field_cls.type): - continue - pointer_key = _Pointer(field_name, is_iter=isinstance(field_cls, IterField)) - self.pointer_mapping[pointer_key] = self.__class__( - name=generate_lower_name(field_cls.type), - driver=self.driver, - model_cls=field_cls.type) - - def __iter__(self): - return self.iter() + def __init__(self, api, items=(), api_params=None, **kwargs): + self._api_params = api_params or {} + super(Storage, self).__init__(**kwargs) + self.api = api + self.registered = {} + for item in items: + self.register(item) + self.logger.debug('{name} object is ready: {0!r}'.format( + self, name=self.__class__.__name__)) def __repr__(self): - return '{self.name}(driver={self.driver}, model={self.model_cls})'.format(self=self) - - def create(self): - """ - Creates the model in the storage. - """ - with self.driver as connection: - connection.create(self.name) - - def get(self, entry_id, **kwargs): - """ - Getter for the model from the storage. - - :param basestring entry_id: the id of the table/document. - :return: model instance - :rtype: Model - """ - with self.driver as connection: - data = connection.get( - name=self.name, - entry_id=entry_id, - **kwargs) - data.update(self._get_pointers(data, **kwargs)) - return self.model_cls(**data) + return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self) - def store(self, entry, **kwargs): - """ - Setter for the model in the storage. - - :param Model entry: the table/document to store. - """ - assert isinstance(entry, self.model_cls) - with self.driver as connection: - data = entry.fields_dict - data.update(self._store_pointers(data, **kwargs)) - connection.store( - name=self.name, - entry_id=entry.id, - entry=data, - **kwargs) - - def delete(self, entry_id, **kwargs): - """ - Delete the model from storage. - - :param basestring entry_id: id of the entity to delete from storage. - """ - entry = self.get(entry_id) - with self.driver as connection: - self._delete_pointers(entry, **kwargs) - connection.delete( - name=self.name, - entry_id=entry_id, - **kwargs) - - def iter(self, **kwargs): - """ - Generator over the entries of model in storage. - """ - with self.driver as connection: - for data in connection.iter(name=self.name, **kwargs): - data.update(self._get_pointers(data, **kwargs)) - yield self.model_cls(**data) + def __getattr__(self, item): + try: + return self.registered[item] + except KeyError: + return super(Storage, self).__getattribute__(item) - def update(self, entry_id, **kwargs): + def register(self, entry): """ - Updates and entry in storage. - - :param str entry_id: the id of the table/document. - :param kwargs: the fields to update. + Register the entry to the storage + :param name: :return: """ - with self.driver as connection: - connection.update( - name=self.name, - entry_id=entry_id, - **kwargs - ) - - def _get_pointers(self, data, **kwargs): - pointers = {} - for field, schema in self.pointer_mapping.items(): - if field.is_iter: - pointers[field.name] = [ - schema.get(entry_id=pointer_id, **kwargs) - for pointer_id in data[field.name] - if pointer_id] - elif data[field.name]: - pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs) - return pointers - - def _store_pointers(self, data, **kwargs): - pointers = {} - for field, model_api in self.pointer_mapping.items(): - if field.is_iter: - pointers[field.name] = [] - for iter_entity in data[field.name]: - pointers[field.name].append(iter_entity.id) - model_api.store(iter_entity, **kwargs) - else: - pointers[field.name] = data[field.name].id - model_api.store(data[field.name], **kwargs) - return pointers + raise NotImplementedError('Subclass must implement abstract register method') - def _delete_pointers(self, entry, **kwargs): - for field, schema in self.pointer_mapping.items(): - if field.is_iter: - for iter_entry in getattr(entry, field.name): - schema.delete(iter_entry.id, **kwargs) - else: - schema.delete(getattr(entry, field.name).id, **kwargs) - -class ResourceApi(object): +class ResourceStorage(Storage): """ - Managing the resource in the storage, using the driver. - - :param basestring name: the name of the resource. - :param ResourceDriver driver: the driver which supports this resource in the storage. + Represents resource storage. """ - def __init__(self, driver, resource_name): - """ - Managing the resources in the storage, using the driver. - - :param ResourceDriver driver: the driver which supports this model in the storage. - :param basestring resource_name: the type of the entry this resourceAPI manages. - """ - assert isinstance(driver, ResourceDriver) - self.driver = driver - self.resource_name = resource_name - - def __repr__(self): - return '{name}(driver={self.driver}, resource={self.resource_name})'.format( - name=self.__class__.__name__, self=self) - - def create(self): - """ - Create the resource dir in the storage. - """ - with self.driver as connection: - connection.create(self.resource_name) - - def data(self, entry_id, path=None, **kwargs): + def register(self, name): """ - Retrieve the content of a storage resource. - - :param basestring entry_id: the id of the entry. - :param basestring path: path of the resource on the storage. - :param kwargs: resources to be passed to the driver.. - :return the content of a single file: - """ - with self.driver as connection: - return connection.data( - entry_type=self.resource_name, - entry_id=entry_id, - path=path, - **kwargs) - - def download(self, entry_id, destination, path=None, **kwargs): - """ - Download a file/dir from the resource storage. - - :param basestring entry_id: the id of the entry. - :param basestring destination: the destination of the file/dir. - :param basestring path: path of the resource on the storage. - """ - with self.driver as connection: - connection.download( - entry_type=self.resource_name, - entry_id=entry_id, - destination=destination, - path=path, - **kwargs) - - def upload(self, entry_id, source, path=None, **kwargs): - """ - Upload a file/dir from the resource storage. - - :param basestring entry_id: the id of the entry. - :param basestring source: the source path of the file to upload. - :param basestring path: the destination of the file, relative to the root dir - of the resource + Register the resource type to resource storage. + :param name: + :return: """ - with self.driver as connection: - connection.upload( - entry_type=self.resource_name, - entry_id=entry_id, - source=source, - path=path, - **kwargs) + self.registered[name] = self.api(name=name, **self._api_params) + self.registered[name].create() + self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self)) -def generate_lower_name(model_cls): - """ - Generates the name of the class from the class object. e.g. SomeClass -> some_class - :param model_cls: the class to evaluate. - :return: lower name - :rtype: basestring - """ - return ''.join( - character if character.islower() else '_{0}'.format(character.lower()) - for character in model_cls.__name__)[1:] - - -class ResourceStorage(Storage): +class ModelStorage(Storage): """ - Managing the resource storage. + Represents model storage. """ - def __init__(self, driver, resources=(), **kwargs): - """ - Simple storage client api for Aria applications. - The storage instance defines the tables/documents/code api. - - :param ResourceDriver driver: resource storage driver - :param resources: the resources to register. - """ - assert isinstance(driver, ResourceDriver) - super(ResourceStorage, self).__init__(driver, resources, **kwargs) - - def register(self, resource): + def register(self, model): """ - Registers the resource type in the resource storage manager. - :param resource: the resource to register. + Register the model into the model storage. + :param model: the model to register. + :return: """ - self.registered[resource] = ResourceApi(self.driver, resource_name=resource) + model_name = storage_api.generate_lower_name(model) + if model_name in self.registered: + self.logger.debug('{name} in already storage {self!r}'.format(name=model_name, + self=self)) + return + self.registered[model_name] = self.api(name=model_name, model_cls=model, **self._api_params) + self.registered[model_name].create() + self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self)) - def __getattr__(self, resource): + def drop(self): """ - getattr is a shortcut to simple api - - for Example: - >> storage = ResourceStorage(driver=FileSystemResourceDriver('/tmp')) - >> blueprint_resources = storage.blueprint - >> blueprint_resources.download(blueprint_id, destination='~/blueprint/') - - :param str resource: resource name to download - :return: a storage object that mapped to the resource name - :rtype: ResourceApi + Drop all the tables from the model. + :return: """ - return super(ResourceStorage, self).__getattr__(resource) + for mapi in self.registered.values(): + mapi.drop() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/storage/api.py ---------------------------------------------------------------------- diff --git a/aria/storage/api.py b/aria/storage/api.py new file mode 100644 index 0000000..7bdbd5d --- /dev/null +++ b/aria/storage/api.py @@ -0,0 +1,219 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +General storage API +""" +from contextlib import contextmanager + +from . import exceptions + + +class StorageAPI(object): + """ + General storage Base API + """ + def create(self, **kwargs): + """ + Create a storage API. + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract create method') + + @contextmanager + def connect(self): + """ + Established a connection and destroys it after use. + :return: + """ + try: + self._establish_connection() + yield self + except BaseException as e: + raise exceptions.StorageError(str(e)) + finally: + self._destroy_connection() + + def _establish_connection(self): + """ + Establish a conenction. used in the 'connect' contextmanager. + :return: + """ + pass + + def _destroy_connection(self): + """ + Destroy a connection. used in the 'connect' contextmanager. + :return: + """ + pass + + def __getattr__(self, item): + try: + return self.registered[item] + except KeyError: + return super(StorageAPI, self).__getattribute__(item) + + +class ModelAPI(StorageAPI): + """ + A Base object for the model. + """ + def __init__(self, model_cls, name=None, **kwargs): + """ + Base model API + + :param model_cls: the representing class of the model + :param str name: the name of the model + :param kwargs: + """ + super(ModelAPI, self).__init__(**kwargs) + self._model_cls = model_cls + self._name = name or generate_lower_name(model_cls) + + @property + def name(self): + """ + The name of the class + :return: name of the class + """ + return self._name + + @property + def model_cls(self): + """ + The class represting the model + :return: + """ + return self._model_cls + + def get(self, entry_id, filters=None, **kwargs): + """ + Get entry from storage. + + :param entry_id: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract get method') + + def store(self, entry, **kwargs): + """ + Store entry in storage + + :param entry: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract store method') + + def delete(self, entry_id, **kwargs): + """ + Delete entry from storage. + + :param entry_id: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract delete method') + + def __iter__(self): + return self.iter() + + def iter(self, **kwargs): + """ + Iter over the entries in storage. + + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract iter method') + + def update(self, entry, **kwargs): + """ + Update entry in storage. + + :param entry: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract update method') + + +class ResourceAPI(StorageAPI): + """ + A Base object for the resource. + """ + def __init__(self, name): + """ + Base resource API + :param str name: the resource type + """ + self._name = name + + @property + def name(self): + """ + The name of the resource + :return: + """ + return self._name + + def data(self, entry_id, path=None, **kwargs): + """ + Get a bytesteam from the storagee. + + :param entry_id: + :param path: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract data method') + + def download(self, entry_id, destination, path=None, **kwargs): + """ + Download a resource from the storage. + + :param entry_id: + :param destination: + :param path: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract download method') + + def upload(self, entry_id, source, path=None, **kwargs): + """ + Upload a resource to the storage. + + :param entry_id: + :param source: + :param path: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract upload method') + + +def generate_lower_name(model_cls): + """ + Generates the name of the class from the class object. e.g. SomeClass -> some_class + :param model_cls: the class to evaluate. + :return: lower name + :rtype: basestring + """ + return ''.join( + character if character.islower() else '_{0}'.format(character.lower()) + for character in model_cls.__name__)[1:] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/storage/drivers.py ---------------------------------------------------------------------- diff --git a/aria/storage/drivers.py b/aria/storage/drivers.py deleted file mode 100644 index 1f96956..0000000 --- a/aria/storage/drivers.py +++ /dev/null @@ -1,416 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.drivers module -Path: aria.storage.driver - -drivers module holds a generic abstract implementation of drivers. - -classes: - * Driver - abstract storage driver implementation. - * ModelDriver - abstract model base storage driver. - * ResourceDriver - abstract resource base storage driver. - * FileSystemModelDriver - file system implementation for model storage driver. - * FileSystemResourceDriver - file system implementation for resource storage driver. -""" - -import distutils.dir_util # pylint: disable=no-name-in-module, import-error -import os -import shutil -from functools import partial -from multiprocessing import RLock - -import jsonpickle - -from ..logger import LoggerMixin -from .exceptions import StorageError - -__all__ = ( - 'ModelDriver', - 'FileSystemModelDriver', - 'ResourceDriver', - 'FileSystemResourceDriver', -) - - -class Driver(LoggerMixin): - """ - Driver: storage driver context manager - abstract driver implementation. - In the implementation level, It is a good practice to raise StorageError on Errors. - """ - - def __enter__(self): - """ - Context manager entry method, executes connect. - :return: context manager instance - :rtype: Driver - """ - self.connect() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """ - Context manager exit method, executes disconnect. - """ - self.disconnect() - if not exc_type: - return - # self.logger.debug( - # '{name} had an error'.format(name=self.__class__.__name__), - # exc_info=(exc_type, exc_val, exc_tb)) - if StorageError in exc_type.mro(): - return - raise StorageError('Exception had occurred, {type}: {message}'.format( - type=exc_type, message=str(exc_val))) - - def connect(self): - """ - Open storage connection. - In some cases, This method can get the connection from a connection pool. - """ - pass - - def disconnect(self): - """ - Close storage connection. - In some cases, This method can release the connection to the connection pool. - """ - pass - - def create(self, name, *args, **kwargs): - """ - Create table/document in storage by name. - :param str name: name of table/document in storage. - """ - pass - - -class ModelDriver(Driver): - """ - ModelDriver context manager. - Base Driver for Model based storage. - """ - - def get(self, name, entry_id, **kwargs): - """ - Getter from storage. - :param str name: name of table/document in storage. - :param str entry_id: id of the document to get from storage. - :return: value of entity from the storage. - """ - raise NotImplementedError('Subclass must implement abstract get method') - - def delete(self, name, entry_id, **kwargs): - """ - Delete from storage. - :param str name: name of table/document in storage. - :param str entry_id: id of the entity to delete from storage. - :param dict kwargs: extra kwargs if needed. - """ - raise NotImplementedError('Subclass must implement abstract delete method') - - def store(self, name, entry_id, entry, **kwargs): - """ - Setter to storage. - :param str name: name of table/document in storage. - :param str entry_id: id of the entity to store in the storage. - :param dict entry: content to store. - """ - raise NotImplementedError('Subclass must implement abstract store method') - - def iter(self, name, **kwargs): - """ - Generator over the entries of table/document in storage. - :param str name: name of table/document/file in storage to iter over. - """ - raise NotImplementedError('Subclass must implement abstract iter method') - - def update(self, name, entry_id, **kwargs): - """ - Updates and entry in storage. - - :param str name: name of table/document in storage. - :param str entry_id: id of the document to get from storage. - :param kwargs: the fields to update. - :return: - """ - raise NotImplementedError('Subclass must implement abstract store method') - - -class ResourceDriver(Driver): - """ - ResourceDriver context manager. - Base Driver for Resource based storage. - - Resource storage structure is a file system base. - <resource root directory>/<resource_name>/<entry_id>/<entry> - entry: can be one single file or multiple files and directories. - """ - - def data(self, entry_type, entry_id, path=None, **kwargs): - """ - Get the binary data from a file in a resource entry. - If the entry is a single file no path needed, - If the entry contain number of files the path will gide to the relevant file. - - resource path: - <resource root directory>/<name>/<entry_id>/<path> - - :param basestring entry_type: resource name. - :param basestring entry_id: id of the entity to resource in the storage. - :param basestring path: path to resource relative to entry_id folder in the storage. - :return: entry file object. - :rtype: bytes - """ - raise NotImplementedError('Subclass must implement abstract get method') - - def download(self, entry_type, entry_id, destination, path=None, **kwargs): - """ - Download the resource to a destination. - Like data method bat this method isn't returning data, - Instead it create a new file in local file system. - - resource path: - <resource root directory>/<name>/<entry_id>/<path> - copy to: - /<destination> - destination can be file or directory - - :param basestring entry_type: resource name. - :param basestring entry_id: id of the entity to resource in the storage. - :param basestring destination: path in local file system to download to. - :param basestring path: path to resource relative to entry_id folder in the storage. - """ - raise NotImplementedError('Subclass must implement abstract get method') - - def upload(self, entry_type, entry_id, source, path=None, **kwargs): - """ - Upload the resource from source. - source can be file or directory with files. - - copy from: - /<source> - to resource path: - <resource root directory>/<name>/<entry_id>/<path> - - :param basestring entry_type: resource name. - :param basestring entry_id: id of the entity to resource in the storage. - :param basestring source: source can be file or directory with files. - :param basestring path: path to resource relative to entry_id folder in the storage. - """ - raise NotImplementedError('Subclass must implement abstract get method') - - -class BaseFileSystemDriver(Driver): - """ - Base class which handles storage on the file system. - """ - def __init__(self, *args, **kwargs): - super(BaseFileSystemDriver, self).__init__(*args, **kwargs) - self._lock = RLock() - - def connect(self): - self._lock.acquire() - - def disconnect(self): - self._lock.release() - - def __getstate__(self): - obj_dict = super(BaseFileSystemDriver, self).__getstate__() - del obj_dict['_lock'] - return obj_dict - - def __setstate__(self, obj_dict): - super(BaseFileSystemDriver, self).__setstate__(obj_dict) - vars(self).update(_lock=RLock(), **obj_dict) - - -class FileSystemModelDriver(ModelDriver, BaseFileSystemDriver): - """ - FileSystemModelDriver context manager. - """ - - def __init__(self, directory, **kwargs): - """ - File system implementation for storage driver. - :param str directory: root dir for storage. - """ - super(FileSystemModelDriver, self).__init__(**kwargs) - self.directory = directory - - self._join_path = partial(os.path.join, self.directory) - - def __repr__(self): - return '{cls.__name__}(directory={self.directory})'.format( - cls=self.__class__, self=self) - - def create(self, name): - """ - Create directory in storage by path. - tries to create the root directory as well. - :param str name: path of file in storage. - """ - try: - os.makedirs(self.directory) - except (OSError, IOError): - pass - os.makedirs(self._join_path(name)) - - def get(self, name, entry_id, **kwargs): - """ - Getter from storage. - :param str name: name of directory in storage. - :param str entry_id: id of the file to get from storage. - :return: value of file from storage. - :rtype: dict - """ - with open(self._join_path(name, entry_id)) as file_obj: - return jsonpickle.loads(file_obj.read()) - - def store(self, name, entry_id, entry, **kwargs): - """ - Delete from storage. - :param str name: name of directory in storage. - :param str entry_id: id of the file to delete from storage. - """ - with open(self._join_path(name, entry_id), 'w') as file_obj: - file_obj.write(jsonpickle.dumps(entry)) - - def delete(self, name, entry_id, **kwargs): - """ - Delete from storage. - :param str name: name of directory in storage. - :param str entry_id: id of the file to delete from storage. - """ - os.remove(self._join_path(name, entry_id)) - - def iter(self, name, filters=None, **kwargs): - """ - Generator over the entries of directory in storage. - :param str name: name of directory in storage to iter over. - :param dict filters: filters for query - """ - filters = filters or {} - - for entry_id in os.listdir(self._join_path(name)): - value = self.get(name, entry_id=entry_id) - for filter_name, filter_value in filters.items(): - if value.get(filter_name) != filter_value: - break - else: - yield value - - def update(self, name, entry_id, **kwargs): - """ - Updates and entry in storage. - - :param str name: name of table/document in storage. - :param str entry_id: id of the document to get from storage. - :param kwargs: the fields to update. - :return: - """ - entry_dict = self.get(name, entry_id) - entry_dict.update(**kwargs) - self.store(name, entry_id, entry_dict) - - -class FileSystemResourceDriver(ResourceDriver, BaseFileSystemDriver): - """ - FileSystemResourceDriver context manager. - """ - - def __init__(self, directory, **kwargs): - """ - File system implementation for storage driver. - :param str directory: root dir for storage. - """ - super(FileSystemResourceDriver, self).__init__(**kwargs) - self.directory = directory - self._join_path = partial(os.path.join, self.directory) - - def __repr__(self): - return '{cls.__name__}(directory={self.directory})'.format( - cls=self.__class__, self=self) - - def create(self, name): - """ - Create directory in storage by path. - tries to create the root directory as well. - :param basestring name: path of file in storage. - """ - try: - os.makedirs(self.directory) - except (OSError, IOError): - pass - os.makedirs(self._join_path(name)) - - def data(self, entry_type, entry_id, path=None): - """ - Retrieve the content of a file system storage resource. - - :param basestring entry_type: the type of the entry. - :param basestring entry_id: the id of the entry. - :param basestring path: a path to a specific resource. - :return: the content of the file - :rtype: bytes - """ - resource_relative_path = os.path.join(entry_type, entry_id, path or '') - resource = os.path.join(self.directory, resource_relative_path) - if not os.path.exists(resource): - raise StorageError("Resource {0} does not exist".format(resource_relative_path)) - if not os.path.isfile(resource): - resources = os.listdir(resource) - if len(resources) != 1: - raise StorageError('No resource in path: {0}'.format(resource)) - resource = os.path.join(resource, resources[0]) - with open(resource, 'rb') as resource_file: - return resource_file.read() - - def download(self, entry_type, entry_id, destination, path=None): - """ - Download a specific file or dir from the file system resource storage. - - :param basestring entry_type: the name of the entry. - :param basestring entry_id: the id of the entry - :param basestring destination: the destination of the files. - :param basestring path: a path on the remote machine relative to the root of the entry. - """ - resource_relative_path = os.path.join(entry_type, entry_id, path or '') - resource = os.path.join(self.directory, resource_relative_path) - if not os.path.exists(resource): - raise StorageError("Resource {0} does not exist".format(resource_relative_path)) - if os.path.isfile(resource): - shutil.copy2(resource, destination) - else: - distutils.dir_util.copy_tree(resource, destination) # pylint: disable=no-member - - def upload(self, entry_type, entry_id, source, path=None): - """ - Uploads a specific file or dir to the file system resource storage. - - :param basestring entry_type: the name of the entry. - :param basestring entry_id: the id of the entry - :param source: the source of the files to upload. - :param path: the destination of the file/s relative to the entry root dir. - """ - resource_directory = os.path.join(self.directory, entry_type, entry_id) - if not os.path.exists(resource_directory): - os.makedirs(resource_directory) - destination = os.path.join(resource_directory, path or '') - if os.path.isfile(source): - shutil.copy2(source, destination) - else: - distutils.dir_util.copy_tree(source, destination) # pylint: disable=no-member http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/storage/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py index 22dfc50..f982f63 100644 --- a/aria/storage/exceptions.py +++ b/aria/storage/exceptions.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Storage based exceptions +""" from .. import exceptions http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/storage/filesystem_api.py ---------------------------------------------------------------------- diff --git a/aria/storage/filesystem_api.py b/aria/storage/filesystem_api.py new file mode 100644 index 0000000..f28d1f6 --- /dev/null +++ b/aria/storage/filesystem_api.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Filesystem based API Base +""" +from multiprocessing import RLock + +from . import api + + +class BaseFileSystemAPI(api.StorageAPI): + """ + Base class which handles storage on the file system. + """ + + def create(self, **kwargs): + super(BaseFileSystemAPI, self).create(**kwargs) + + def __init__(self, *args, **kwargs): + super(BaseFileSystemAPI, self).__init__(*args, **kwargs) + self._lock = RLock() + + def _establish_connection(self): + self._lock.acquire() + + def _destroy_connection(self): + self._lock.release() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/storage/mapi/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/mapi/__init__.py b/aria/storage/mapi/__init__.py new file mode 100644 index 0000000..d4a8c6e --- /dev/null +++ b/aria/storage/mapi/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +A collection of MAPIs +""" +from .filesystem import FileSystemModelAPI +from .inmemory import InMemoryModelAPI +from .sql import SQLAlchemyModelAPI http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/storage/mapi/filesystem.py ---------------------------------------------------------------------- diff --git a/aria/storage/mapi/filesystem.py b/aria/storage/mapi/filesystem.py new file mode 100644 index 0000000..fa24869 --- /dev/null +++ b/aria/storage/mapi/filesystem.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQLalchemy based MAPI +""" +import os +from functools import partial + +import jsonpickle + +from .. import ( + api, + filesystem_api +) + + +class FileSystemModelAPI(api.ModelAPI, filesystem_api.BaseFileSystemAPI): + """ + File system model storage. + """ + + def __init__(self, directory, **kwargs): + """ + File system implementation for storage api. + :param str directory: root dir for storage. + """ + super(FileSystemModelAPI, self).__init__(**kwargs) + self.directory = directory + self.base_path = os.path.join(self.directory, self.name) + self._join_path = partial(os.path.join, self.base_path) + + def __repr__(self): + return '{cls.__name__}(directory={self.directory})'.format( + cls=self.__class__, self=self) + + def create(self, **kwargs): + """ + Create directory in storage by path. + tries to create the root directory as well. + :param str name: path of file in storage. + """ + with self.connect(): + try: + os.makedirs(self.directory) + except (OSError, IOError): + pass + os.makedirs(self.base_path) + + def get(self, entry_id, **kwargs): + """ + Getter from storage. + :param str entry_id: id of the file to get from storage. + :return: value of file from storage. + :rtype: dict + """ + with self.connect(): + with open(self._join_path(entry_id)) as file_obj: + return jsonpickle.loads(file_obj.read()) + + def store(self, entry, **kwargs): + """ + Delete from storage. + :param Model entry: name of directory in storage. + """ + with self.connect(): + with open(self._join_path(entry.id), 'w') as file_obj: + file_obj.write(jsonpickle.dumps(entry)) + + def delete(self, entry_id, **kwargs): + """ + Delete from storage. + :param str name: name of directory in storage. + :param str entry_id: id of the file to delete from storage. + """ + with self.connect(): + os.remove(self._join_path(entry_id)) + + def iter(self, filters=None, **kwargs): + """ + Generator over the entries of directory in storage. + :param dict filters: filters for query + """ + filters = filters or {} + with self.connect(): + for entry_id in os.listdir(self.base_path): + value = self.get(entry_id=entry_id) + for filter_name, filter_value in filters.items(): + if value.get(filter_name) != filter_value: + break + else: + yield value + + def update(self, entry_id, **kwargs): + """ + Updates and entry in storage. + + :param str name: name of table/document in storage. + :param str entry_id: id of the document to get from storage. + :param kwargs: the fields to update. + :return: + """ + with self.connect(): + entry = self.get(entry_id) + for key, value in kwargs.items(): + setattr(entry, key, value) + self.store(entry) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/88bc5d18/aria/storage/mapi/inmemory.py ---------------------------------------------------------------------- diff --git a/aria/storage/mapi/inmemory.py b/aria/storage/mapi/inmemory.py new file mode 100644 index 0000000..09dbcfc --- /dev/null +++ b/aria/storage/mapi/inmemory.py @@ -0,0 +1,148 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# DEPRECATED +#pylint: skip-file + +from collections import namedtuple + + +from .. import api +from ..structures import orm + + +_Pointer = namedtuple('_Pointer', 'name, is_iter') + +storage = {} + + +class InMemoryModelAPI(api.ModelAPI): + def __init__(self, *args, **kwargs): + """ + Managing the model in the storage, using the driver. + + :param basestring name: the name of the model. + :param ModelDriver driver: the driver which supports this model in the storage. + :param Model model_cls: table/document class model. + """ + super(InMemoryModelAPI, self).__init__(*args, **kwargs) + self.pointer_mapping = {} + + def create(self): + """ + Creates the model in the storage. + """ + with self.connect(): + storage[self.name] = {} + self._setup_pointers_mapping() + + def _setup_pointers_mapping(self): + for field_name, field_cls in vars(self.model_cls).items(): + if not (getattr(field_cls, 'impl', None) is not None and + isinstance(field_cls.impl.parent_token, orm.RelationshipProperty)): + continue + pointer_key = _Pointer(field_name, is_iter=False) + self.pointer_mapping[pointer_key] = self.__class__( + name=api.generate_lower_name(field_cls.class_), + model_cls=field_cls.class_) + + def get(self, entry_id, **kwargs): + """ + Getter for the model from the storage. + + :param basestring entry_id: the id of the table/document. + :return: model instance + :rtype: Model + """ + with self.connect(): + data = storage[self.name][entry_id] + data.update(self._get_pointers(data, **kwargs)) + return self.model_cls(**data) + + def store(self, entry, **kwargs): + """ + Setter for the model in the storage. + + :param Model entry: the table/document to store. + """ + with self.connect(): + data = entry.to_dict + data.update(self._store_pointers(data, **kwargs)) + storage[self.name][entry.id] = data + + def delete(self, entry_id, **kwargs): + """ + Delete the model from storage. + + :param basestring entry_id: id of the entity to delete from storage. + """ + entry = self.get(entry_id) + with self.connect(): + self._delete_pointers(entry, **kwargs) + storage[self.name].pop(entry_id) + + def iter(self, **kwargs): + """ + Generator over the entries of model in storage. + """ + with self.connect(): + for data in storage[self.name].values(): + data.update(self._get_pointers(data, **kwargs)) + yield self.model_cls(**data) + + def update(self, entry_id, **kwargs): + """ + Updates and entry in storage. + + :param str entry_id: the id of the table/document. + :param kwargs: the fields to update. + :return: + """ + with self.connect(): + storage[self.name][entry_id].update(**kwargs) + + def _get_pointers(self, data, **kwargs): + pointers = {} + for field, schema in self.pointer_mapping.items(): + if field.is_iter: + pointers[field.name] = [ + schema.get(entry_id=pointer_id, **kwargs) + for pointer_id in data[field.name] + if pointer_id] + elif data[field.name]: + pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs) + return pointers + + def _store_pointers(self, data, **kwargs): + pointers = {} + for field, model_api in self.pointer_mapping.items(): + if field.is_iter: + pointers[field.name] = [] + for iter_entity in data[field.name]: + pointers[field.name].append(iter_entity.id) + model_api.store(iter_entity, **kwargs) + else: + pointers[field.name] = data[field.name].id + model_api.store(data[field.name], **kwargs) + return pointers + + def _delete_pointers(self, entry, **kwargs): + for field, schema in self.pointer_mapping.items(): + if field.is_iter: + for iter_entry in getattr(entry, field.name): + schema.delete(iter_entry.id, **kwargs) + else: + schema.delete(getattr(entry, field.name).id, **kwargs)