ARIA-30 SQL based storage implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/c6c92ae5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/c6c92ae5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/c6c92ae5 Branch: refs/heads/ARIA-30-SQL-based-storage-implementation Commit: c6c92ae5b25b49e62b06f8867f6a7b0046f04428 Parents: fe974e4 Author: mxmrlv <[email protected]> Authored: Sun Nov 27 13:20:46 2016 +0200 Committer: mxmrlv <[email protected]> Committed: Thu Dec 8 11:59:24 2016 +0200 ---------------------------------------------------------------------- aria/__init__.py | 43 +- aria/orchestrator/__init__.py | 4 +- aria/orchestrator/context/common.py | 29 +- aria/orchestrator/context/exceptions.py | 4 +- aria/orchestrator/context/operation.py | 27 +- aria/orchestrator/context/toolbelt.py | 20 +- aria/orchestrator/context/workflow.py | 51 +- aria/orchestrator/exceptions.py | 7 +- aria/orchestrator/workflows/api/task.py | 10 +- aria/orchestrator/workflows/builtin/heal.py | 25 +- aria/orchestrator/workflows/builtin/install.py | 7 +- .../orchestrator/workflows/builtin/uninstall.py | 7 +- .../orchestrator/workflows/builtin/workflows.py | 13 +- aria/orchestrator/workflows/core/engine.py | 6 +- aria/orchestrator/workflows/core/task.py | 38 +- aria/storage/__init__.py | 372 +----- aria/storage/api.py | 182 +++ aria/storage/core.py | 125 ++ aria/storage/drivers.py | 416 ------- aria/storage/exceptions.py | 4 +- aria/storage/filesystem_rapi.py | 150 +++ aria/storage/models.py | 702 ++++++----- aria/storage/sql_mapi.py | 382 ++++++ aria/storage/structures.py | 399 +++--- aria/utils/application.py | 14 +- requirements.txt | 1 + tests/mock/context.py | 50 +- tests/mock/models.py | 102 +- tests/orchestrator/context/test_operation.py | 80 +- tests/orchestrator/context/test_toolbelt.py | 92 +- tests/orchestrator/context/test_workflow.py | 37 +- tests/orchestrator/workflows/api/test_task.py | 76 +- .../orchestrator/workflows/builtin/__init__.py | 35 +- .../workflows/builtin/test_execute_operation.py | 17 +- .../orchestrator/workflows/builtin/test_heal.py | 23 +- .../workflows/builtin/test_install.py | 16 +- .../workflows/builtin/test_uninstall.py | 13 +- .../orchestrator/workflows/core/test_engine.py | 47 +- tests/orchestrator/workflows/core/test_task.py | 37 +- .../test_task_graph_into_exececution_graph.py | 15 +- tests/requirements.txt | 2 +- tests/storage/__init__.py | 75 +- tests/storage/test_drivers.py | 135 --- tests/storage/test_field.py | 124 -- tests/storage/test_model_storage.py | 134 +- tests/storage/test_models.py | 1143 +++++++++++++----- tests/storage/test_models_api.py | 70 -- tests/storage/test_resource_storage.py | 62 +- 48 files changed, 2854 insertions(+), 2569 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 3f81f98..b000397 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -23,7 +23,6 @@ import pkgutil from .VERSION import version as __version__ from .orchestrator.decorators import workflow, operation -from .storage import ModelStorage, ResourceStorage, models, ModelDriver, ResourceDriver from . import ( utils, parser, @@ -37,7 +36,6 @@ __all__ = ( 'operation', ) -_model_storage = {} _resource_storage = {} @@ -58,37 +56,38 @@ def install_aria_extensions(): del sys.modules[module_name] -def application_model_storage(driver): +def application_model_storage(api, api_kwargs=None): """ Initiate model storage for the supplied storage driver """ + models = [ + storage.models.Plugin, + storage.models.ProviderContext, - assert isinstance(driver, ModelDriver) - if driver not in _model_storage: - _model_storage[driver] = ModelStorage( - driver, model_classes=[ - models.Node, - models.NodeInstance, - models.Plugin, - models.Blueprint, - models.Snapshot, - models.Deployment, - models.DeploymentUpdate, - models.DeploymentModification, - models.Execution, - models.ProviderContext, - models.Task, - ]) - return _model_storage[driver] + storage.models.Blueprint, + storage.models.Deployment, + storage.models.DeploymentUpdate, + storage.models.DeploymentUpdateStep, + storage.models.DeploymentModification, + + storage.models.Node, + storage.models.NodeInstance, + storage.models.Relationship, + storage.models.RelationshipInstance, + + storage.models.Execution, + storage.models.Task, + ] + # if api not in _model_storage: + return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {}) def application_resource_storage(driver): """ Initiate resource storage for the supplied storage driver """ - assert isinstance(driver, ResourceDriver) if driver not in _resource_storage: - _resource_storage[driver] = ResourceStorage( + _resource_storage[driver] = storage.ResourceStorage( driver, resources=[ 'blueprint', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/__init__.py b/aria/orchestrator/__init__.py index a5aeec7..90d6442 100644 --- a/aria/orchestrator/__init__.py +++ b/aria/orchestrator/__init__.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Aria orchestrator +""" from .decorators import workflow, operation from . import ( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index f2bf83b..14efd9d 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -32,8 +32,7 @@ class BaseContext(logger.LoggerMixin): model_storage, resource_storage, deployment_id, - workflow_id, - execution_id=None, + workflow_name, task_max_attempts=1, task_retry_interval=0, task_ignore_failure=False, @@ -44,8 +43,7 @@ class BaseContext(logger.LoggerMixin): self._model = model_storage self._resource = resource_storage self._deployment_id = deployment_id - self._workflow_id = workflow_id - self._execution_id = execution_id or str(uuid4()) + self._workflow_name = workflow_name self._task_max_attempts = task_max_attempts self._task_retry_interval = task_retry_interval self._task_ignore_failure = task_ignore_failure @@ -54,8 +52,7 @@ class BaseContext(logger.LoggerMixin): return ( '{name}(name={self.name}, ' 'deployment_id={self._deployment_id}, ' - 'workflow_id={self._workflow_id}, ' - 'execution_id={self._execution_id})' + 'workflow_name={self._workflow_name}, ' .format(name=self.__class__.__name__, self=self)) @property @@ -79,7 +76,7 @@ class BaseContext(logger.LoggerMixin): """ The blueprint model """ - return self.model.blueprint.get(self.deployment.blueprint_id) + return self.deployment.blueprint @property def deployment(self): @@ -89,20 +86,6 @@ class BaseContext(logger.LoggerMixin): return self.model.deployment.get(self._deployment_id) @property - def execution(self): - """ - The execution model - """ - return self.model.execution.get(self._execution_id) - - @execution.setter - def execution(self, value): - """ - Store the execution in the model storage - """ - self.model.execution.store(value) - - @property def name(self): """ The operation name @@ -136,6 +119,6 @@ class BaseContext(logger.LoggerMixin): Read a deployment resource as string from the resource storage """ try: - return self.resource.deployment.data(entry_id=self.deployment.id, path=path) + return self.resource.deployment.read(entry_id=self.deployment.id, path=path) except exceptions.StorageError: - return self.resource.blueprint.data(entry_id=self.blueprint.id, path=path) + return self.resource.blueprint.read(entry_id=self.blueprint.id, path=path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/exceptions.py b/aria/orchestrator/context/exceptions.py index 6704bbc..fe762e1 100644 --- a/aria/orchestrator/context/exceptions.py +++ b/aria/orchestrator/context/exceptions.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Context based exceptions +""" from ..exceptions import OrchestratorError http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index bf3686d..a73bad1 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -26,17 +26,17 @@ class BaseOperationContext(BaseContext): Context object used during operation creation and execution """ - def __init__(self, name, workflow_context, task, **kwargs): + def __init__(self, name, workflow_context, task, actor, **kwargs): super(BaseOperationContext, self).__init__( name=name, model_storage=workflow_context.model, resource_storage=workflow_context.resource, deployment_id=workflow_context._deployment_id, - workflow_id=workflow_context._workflow_id, - execution_id=workflow_context._execution_id, + workflow_name=workflow_context._workflow_name, **kwargs) self._task_model = task - self._actor = self.task.actor + self._task_id = task.id + self._actor_id = actor.id def __repr__(self): details = 'operation_mapping={task.operation_mapping}; ' \ @@ -50,7 +50,7 @@ class BaseOperationContext(BaseContext): The task in the model storage :return: Task model """ - return self._task_model + return self.model.task.get(self._task_id) class NodeOperationContext(BaseOperationContext): @@ -63,7 +63,7 @@ class NodeOperationContext(BaseOperationContext): the node of the current operation :return: """ - return self._actor.node + return self.node_instance.node @property def node_instance(self): @@ -71,7 +71,7 @@ class NodeOperationContext(BaseOperationContext): The node instance of the current operation :return: """ - return self._actor + return self.model.node_instance.get(self._actor_id) class RelationshipOperationContext(BaseOperationContext): @@ -84,7 +84,7 @@ class RelationshipOperationContext(BaseOperationContext): The source node :return: """ - return self.model.node.get(self.relationship.source_id) + return self.relationship.source_node @property def source_node_instance(self): @@ -92,7 +92,7 @@ class RelationshipOperationContext(BaseOperationContext): The source node instance :return: """ - return self.model.node_instance.get(self.relationship_instance.source_id) + return self.relationship_instance.source_node_instance @property def target_node(self): @@ -100,7 +100,7 @@ class RelationshipOperationContext(BaseOperationContext): The target node :return: """ - return self.model.node.get(self.relationship.target_id) + return self.relationship.target_node @property def target_node_instance(self): @@ -108,7 +108,7 @@ class RelationshipOperationContext(BaseOperationContext): The target node instance :return: """ - return self.model.node_instance.get(self._actor.target_id) + return self.relationship_instance.target_node_instance @property def relationship(self): @@ -116,7 +116,8 @@ class RelationshipOperationContext(BaseOperationContext): The relationship of the current operation :return: """ - return self._actor.relationship + + return self.relationship_instance.relationship @property def relationship_instance(self): @@ -124,4 +125,4 @@ class RelationshipOperationContext(BaseOperationContext): The relationship instance of the current operation :return: """ - return self._actor + return self.model.relationship_instance.get(self._actor_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py index 0aad89c..301b013 100644 --- a/aria/orchestrator/context/toolbelt.py +++ b/aria/orchestrator/context/toolbelt.py @@ -27,30 +27,14 @@ class NodeToolBelt(object): self._op_context = operation_context @property - def dependent_node_instances(self): - """ - Any node instance which has a relationship to the current node instance. - :return: - """ - assert isinstance(self._op_context, operation.NodeOperationContext) - node_instances = self._op_context.model.node_instance.iter( - filters={'deployment_id': self._op_context.deployment.id} - ) - for node_instance in node_instances: - for relationship_instance in node_instance.relationship_instances: - if relationship_instance.target_id == self._op_context.node_instance.id: - yield node_instance - - @property def host_ip(self): """ The host ip of the current node :return: """ assert isinstance(self._op_context, operation.NodeOperationContext) - host_id = self._op_context._actor.host_id - host_instance = self._op_context.model.node_instance.get(host_id) - return host_instance.runtime_properties.get('ip') + host = self._op_context.node_instance.host + return host.runtime_properties.get('ip') class RelationshipToolBelt(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 3dc222b..e2e8e25 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -19,8 +19,7 @@ Workflow and operation contexts import threading from contextlib import contextmanager - -from aria import storage +from datetime import datetime from .exceptions import ContextException from .common import BaseContext @@ -30,53 +29,73 @@ class WorkflowContext(BaseContext): """ Context object used during workflow creation and execution """ - def __init__(self, parameters=None, *args, **kwargs): + def __init__(self, parameters=None, execution_id=None, *args, **kwargs): super(WorkflowContext, self).__init__(*args, **kwargs) self.parameters = parameters or {} # TODO: execution creation should happen somewhere else # should be moved there, when such logical place exists - try: - self.model.execution.get(self._execution_id) - except storage.exceptions.StorageError: - self._create_execution() + self._execution_id = self._create_execution() if execution_id is None else execution_id def __repr__(self): return ( '{name}(deployment_id={self._deployment_id}, ' - 'workflow_id={self._workflow_id}, ' - 'execution_id={self._execution_id})'.format( + 'workflow_name={self._workflow_name}'.format( name=self.__class__.__name__, self=self)) def _create_execution(self): execution_cls = self.model.execution.model_cls + now = datetime.utcnow() execution = self.model.execution.model_cls( - id=self._execution_id, - deployment_id=self.deployment.id, - workflow_id=self._workflow_id, blueprint_id=self.blueprint.id, + deployment_id=self.deployment.id, + workflow_name=self._workflow_name, + created_at=now, status=execution_cls.PENDING, parameters=self.parameters, ) - self.model.execution.store(execution) + self.model.execution.put(execution) + return execution.id + + @property + def execution(self): + """ + The execution model + """ + return self.model.execution.get(self._execution_id) + + @execution.setter + def execution(self, value): + """ + Store the execution in the model storage + """ + self.model.execution.put(value) @property def nodes(self): """ Iterator over nodes """ - return self.model.node.iter(filters={'blueprint_id': self.blueprint.id}) + return self.model.node.iter( + filters={ + 'deployment_id': self.deployment.id + } + ) @property def node_instances(self): """ Iterator over node instances """ - return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id}) + return self.model.node_instance.iter( + filters={ + 'deployment_id': self.deployment.id + } + ) class _CurrentContext(threading.local): """ - Provides thread-level context, which sugarcoats the task api. + Provides thread-level context, which sugarcoats the task mapi. """ def __init__(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py index 75b37cf..1a48194 100644 --- a/aria/orchestrator/exceptions.py +++ b/aria/orchestrator/exceptions.py @@ -12,9 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Orchestrator based exceptions +""" from aria.exceptions import AriaError class OrchestratorError(AriaError): + """ + Orchestrator based exception + """ pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 4d36725..1c12407 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph """ from uuid import uuid4 -import aria +from aria.storage import models from ... import context from .. import exceptions @@ -75,8 +75,8 @@ class OperationTask(BaseTask): :param actor: the operation host on which this operation is registered. :param inputs: operation inputs. """ - assert isinstance(actor, (aria.storage.models.NodeInstance, - aria.storage.models.RelationshipInstance)) + assert isinstance(actor, (models.NodeInstance, + models.RelationshipInstance)) super(OperationTask, self).__init__() self.actor = actor self.name = '{name}.{actor.id}'.format(name=name, actor=actor) @@ -97,7 +97,7 @@ class OperationTask(BaseTask): :param instance: the node of which this operation belongs to. :param name: the name of the operation. """ - assert isinstance(instance, aria.storage.models.NodeInstance) + assert isinstance(instance, models.NodeInstance) operation_details = instance.node.operations[name] operation_inputs = operation_details.get('inputs', {}) operation_inputs.update(inputs or {}) @@ -119,7 +119,7 @@ class OperationTask(BaseTask): with 'source_operations' and 'target_operations' :param inputs any additional inputs to the operation """ - assert isinstance(instance, aria.storage.models.RelationshipInstance) + assert isinstance(instance, models.RelationshipInstance) if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: raise exceptions.TaskException('The operation end should be {0} or {1}'.format( cls.TARGET_OPERATION, cls.SOURCE_OPERATION http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py index dbfc14e..de07095 100644 --- a/aria/orchestrator/workflows/builtin/heal.py +++ b/aria/orchestrator/workflows/builtin/heal.py @@ -84,16 +84,19 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): # create dependencies between the node instance sub workflow for node_instance in failing_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.relationship_instances): - graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], - node_instance_sub_workflow) + for relationship_instance in reversed(node_instance.outbound_relationship_instances): + graph.add_dependency( + node_instance_sub_workflows[relationship_instance.target_node_instance.id], + node_instance_sub_workflow) # Add operations for intact nodes depending on a node instance belonging to node_instances for node_instance in targeted_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.relationship_instances): - target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) + for relationship_instance in reversed(node_instance.outbound_relationship_instances): + + target_node_instance = \ + ctx.model.node_instance.get(relationship_instance.target_node_instance.id) target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id] graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow) @@ -134,9 +137,10 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): # create dependencies between the node instance sub workflow for node_instance in failing_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - if node_instance.relationship_instances: - dependencies = [node_instance_sub_workflows[relationship_instance.target_id] - for relationship_instance in node_instance.relationship_instances] + if node_instance.outbound_relationship_instances: + dependencies = \ + [node_instance_sub_workflows[relationship_instance.target_node_instance.id] + for relationship_instance in node_instance.outbound_relationship_instances] graph.add_dependency(node_instance_sub_workflow, dependencies) # Add operations for intact nodes depending on a node instance @@ -144,8 +148,9 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): for node_instance in targeted_node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in node_instance.relationship_instances: - target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) + for relationship_instance in node_instance.outbound_relationship_instances: + target_node_instance = ctx.model.node_instance.get( + relationship_instance.target_node_instance.id) target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id] graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py index 0ab3ad6..eb5b4e8 100644 --- a/aria/orchestrator/workflows/builtin/install.py +++ b/aria/orchestrator/workflows/builtin/install.py @@ -47,7 +47,8 @@ def install(ctx, graph, node_instances=(), node_instance_sub_workflows=None): # create dependencies between the node instance sub workflow for node_instance in node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - if node_instance.relationship_instances: - dependencies = [node_instance_sub_workflows[relationship_instance.target_id] - for relationship_instance in node_instance.relationship_instances] + if node_instance.outbound_relationship_instances: + dependencies = [ + node_instance_sub_workflows[relationship_instance.target_node_instance.id] + for relationship_instance in node_instance.outbound_relationship_instances] graph.add_dependency(node_instance_sub_workflow, dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py index f4e965c..db1c0cc 100644 --- a/aria/orchestrator/workflows/builtin/uninstall.py +++ b/aria/orchestrator/workflows/builtin/uninstall.py @@ -27,7 +27,7 @@ from ..api import task def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None): """ The uninstall workflow - :param WorkflowContext context: the workflow context + :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the graph which will describe the workflow. :param node_instances: the node instances on which to run the workflow :param dict node_instance_sub_workflows: a dictionary of subworkflows with id as key and @@ -47,6 +47,7 @@ def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None): # create dependencies between the node instance sub workflow for node_instance in node_instances: node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] - for relationship_instance in reversed(node_instance.relationship_instances): - graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], + for relationship_instance in reversed(node_instance.outbound_relationship_instances): + target_id = relationship_instance.target_node_instance.id + graph.add_dependency(node_instance_sub_workflows[target_id], node_instance_sub_workflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py index 0eb8c34..4f765b3 100644 --- a/aria/orchestrator/workflows/builtin/workflows.py +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -37,7 +37,6 @@ __all__ = ( def install_node_instance(graph, node_instance, **kwargs): """ A workflow which installs a node instance. - :param WorkflowContext ctx: the workflow context :param TaskGraph graph: the tasks graph of which to edit :param node_instance: the node instance to install :return: @@ -68,7 +67,6 @@ def install_node_instance(graph, node_instance, **kwargs): def preconfigure_relationship(graph, node_instance, **kwargs): """ - :param context: :param graph: :param node_instance: :return: @@ -82,7 +80,6 @@ def preconfigure_relationship(graph, node_instance, **kwargs): def postconfigure_relationship(graph, node_instance, **kwargs): """ - :param context: :param graph: :param node_instance: :return: @@ -96,7 +93,6 @@ def postconfigure_relationship(graph, node_instance, **kwargs): def establish_relationship(graph, node_instance, **kwargs): """ - :param context: :param graph: :param node_instance: :return: @@ -113,7 +109,6 @@ def establish_relationship(graph, node_instance, **kwargs): def uninstall_node_instance(graph, node_instance, **kwargs): """ A workflow which uninstalls a node instance. - :param WorkflowContext context: the workflow context :param TaskGraph graph: the tasks graph of which to edit :param node_instance: the node instance to uninstall :return: @@ -135,7 +130,6 @@ def uninstall_node_instance(graph, node_instance, **kwargs): def unlink_relationship(graph, node_instance): """ - :param context: :param graph: :param node_instance: :return: @@ -179,8 +173,8 @@ def relationships_tasks(graph, operation_name, node_instance): :return: """ relationships_groups = groupby( - node_instance.relationship_instances, - key=lambda relationship_instance: relationship_instance.relationship.target_id) + node_instance.outbound_relationship_instances, + key=lambda relationship_instance: relationship_instance.target_node_instance.id) sub_tasks = [] for _, (_, relationship_group) in enumerate(relationships_groups): @@ -196,11 +190,8 @@ def relationships_tasks(graph, operation_name, node_instance): def relationship_tasks(relationship_instance, operation_name): """ Creates a relationship task source and target. - :param NodeInstance node_instance: the node instance of the relationship :param RelationshipInstance relationship_instance: the relationship instance itself - :param WorkflowContext context: :param operation_name: - :param index: the relationship index - enables pretty print :return: """ source_operation = task.OperationTask.relationship_instance( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 87ea8c6..2d26aeb 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -100,7 +100,11 @@ class Engine(logger.LoggerMixin): return len(self._execution_graph.node) == 0 def _tasks_iter(self): - return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) + for _, data in self._execution_graph.nodes_iter(data=True): + task = data['task'] + if isinstance(task, engine_task.OperationTask): + self._workflow_context.model.task.refresh(task.model_task) + yield task def _handle_executable_task(self, task): if isinstance(task, engine_task.StubTask): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index a583cfc..0be17fe 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -106,32 +106,34 @@ class OperationTask(BaseTask): def __init__(self, api_task, *args, **kwargs): super(OperationTask, self).__init__(id=api_task.id, **kwargs) self._workflow_context = api_task._workflow_context - task_model = api_task._workflow_context.model.task.model_cls - operation_task = task_model( - id=api_task.id, - name=api_task.name, - operation_mapping=api_task.operation_mapping, - actor=api_task.actor, - inputs=api_task.inputs, - status=task_model.PENDING, - execution_id=self._workflow_context._execution_id, - max_attempts=api_task.max_attempts, - retry_interval=api_task.retry_interval, - ignore_failure=api_task.ignore_failure - ) + base_task_model = api_task._workflow_context.model.task.model_cls if isinstance(api_task.actor, models.NodeInstance): context_class = operation_context.NodeOperationContext + task_model_cls = base_task_model.as_node_instance elif isinstance(api_task.actor, models.RelationshipInstance): context_class = operation_context.RelationshipOperationContext + task_model_cls = base_task_model.as_relationship_instance else: - raise RuntimeError('No operation context could be created for {0}' - .format(api_task.actor.model_cls)) + raise RuntimeError('No operation context could be created for {actor.model_cls}' + .format(actor=api_task.actor)) + + operation_task = task_model_cls( + name=api_task.name, + operation_mapping=api_task.operation_mapping, + instance_id=api_task.actor.id, + inputs=api_task.inputs, + status=base_task_model.PENDING, + max_attempts=api_task.max_attempts, + retry_interval=api_task.retry_interval, + ignore_failure=api_task.ignore_failure, + ) + self._workflow_context.model.task.put(operation_task) self._ctx = context_class(name=api_task.name, workflow_context=self._workflow_context, - task=operation_task) - self._workflow_context.model.task.store(operation_task) + task=operation_task, + actor=operation_task.actor) self._task_id = operation_task.id self._update_fields = None @@ -161,7 +163,7 @@ class OperationTask(BaseTask): @model_task.setter def model_task(self, value): - self._workflow_context.model.task.store(value) + self._workflow_context.model.task.put(value) @property def context(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index 2d142a5..fd69d47 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -20,14 +20,14 @@ Path: aria.storage Storage package is a generic abstraction over different storage types. We define this abstraction with the following components: -1. storage: simple api to use -2. driver: implementation of the database client api. +1. storage: simple mapi to use +2. driver: implementation of the database client mapi. 3. model: defines the structure of the table/document. 4. field: defines a field/item in the model. API: * application_storage_factory - function, default Aria storage factory. - * Storage - class, simple storage api. + * Storage - class, simple storage mapi. * models - module, default Aria standard models. * structures - module, default Aria structures - holds the base model, and different fields types. @@ -37,354 +37,28 @@ API: * drivers - module, a pool of Aria standard drivers. * StorageDriver - class, abstract model implementation. """ -# todo: rewrite the above package documentation -# (something like explaning the two types of storage - models and resources) - -from collections import namedtuple - -from .structures import Storage, Field, Model, IterField, PointerField -from .drivers import ( - ModelDriver, - ResourceDriver, - FileSystemResourceDriver, - FileSystemModelDriver, +from .core import ( + Storage, + ModelStorage, + ResourceStorage, +) +from . import ( + exceptions, + api, + structures, + core, + filesystem_rapi, + sql_mapi, + models ) -from . import models, exceptions __all__ = ( - 'ModelStorage', - 'ResourceStorage', - 'FileSystemModelDriver', - 'models', + 'exceptions', 'structures', - 'Field', - 'IterField', - 'PointerField', - 'Model', - 'drivers', - 'ModelDriver', - 'ResourceDriver', - 'FileSystemResourceDriver', + # 'Storage', + # 'ModelStorage', + # 'ResourceStorage', + 'filesystem_rapi', + 'sql_mapi', + 'api' ) -# todo: think about package output api's... -# todo: in all drivers name => entry_type -# todo: change in documentation str => basestring - - -class ModelStorage(Storage): - """ - Managing the models storage. - """ - def __init__(self, driver, model_classes=(), **kwargs): - """ - Simple storage client api for Aria applications. - The storage instance defines the tables/documents/code api. - - :param ModelDriver driver: model storage driver. - :param model_classes: the models to register. - """ - assert isinstance(driver, ModelDriver) - super(ModelStorage, self).__init__(driver, model_classes, **kwargs) - - def __getattr__(self, table): - """ - getattr is a shortcut to simple api - - for Example: - >> storage = ModelStorage(driver=FileSystemModelDriver('/tmp')) - >> node_table = storage.node - >> for node in node_table: - >> print node - - :param str table: table name to get - :return: a storage object that mapped to the table name - """ - return super(ModelStorage, self).__getattr__(table) - - def register(self, model_cls): - """ - Registers the model type in the resource storage manager. - :param model_cls: the model to register. - """ - model_name = generate_lower_name(model_cls) - model_api = _ModelApi(model_name, self.driver, model_cls) - self.registered[model_name] = model_api - - for pointer_schema_register in model_api.pointer_mapping.values(): - model_cls = pointer_schema_register.model_cls - self.register(model_cls) - -_Pointer = namedtuple('_Pointer', 'name, is_iter') - - -class _ModelApi(object): - def __init__(self, name, driver, model_cls): - """ - Managing the model in the storage, using the driver. - - :param basestring name: the name of the model. - :param ModelDriver driver: the driver which supports this model in the storage. - :param Model model_cls: table/document class model. - """ - assert isinstance(driver, ModelDriver) - assert issubclass(model_cls, Model) - self.name = name - self.driver = driver - self.model_cls = model_cls - self.pointer_mapping = {} - self._setup_pointers_mapping() - - def _setup_pointers_mapping(self): - for field_name, field_cls in vars(self.model_cls).items(): - if not(isinstance(field_cls, PointerField) and field_cls.type): - continue - pointer_key = _Pointer(field_name, is_iter=isinstance(field_cls, IterField)) - self.pointer_mapping[pointer_key] = self.__class__( - name=generate_lower_name(field_cls.type), - driver=self.driver, - model_cls=field_cls.type) - - def __iter__(self): - return self.iter() - - def __repr__(self): - return '{self.name}(driver={self.driver}, model={self.model_cls})'.format(self=self) - - def create(self): - """ - Creates the model in the storage. - """ - with self.driver as connection: - connection.create(self.name) - - def get(self, entry_id, **kwargs): - """ - Getter for the model from the storage. - - :param basestring entry_id: the id of the table/document. - :return: model instance - :rtype: Model - """ - with self.driver as connection: - data = connection.get( - name=self.name, - entry_id=entry_id, - **kwargs) - data.update(self._get_pointers(data, **kwargs)) - return self.model_cls(**data) - - def store(self, entry, **kwargs): - """ - Setter for the model in the storage. - - :param Model entry: the table/document to store. - """ - assert isinstance(entry, self.model_cls) - with self.driver as connection: - data = entry.fields_dict - data.update(self._store_pointers(data, **kwargs)) - connection.store( - name=self.name, - entry_id=entry.id, - entry=data, - **kwargs) - - def delete(self, entry_id, **kwargs): - """ - Delete the model from storage. - - :param basestring entry_id: id of the entity to delete from storage. - """ - entry = self.get(entry_id) - with self.driver as connection: - self._delete_pointers(entry, **kwargs) - connection.delete( - name=self.name, - entry_id=entry_id, - **kwargs) - - def iter(self, **kwargs): - """ - Generator over the entries of model in storage. - """ - with self.driver as connection: - for data in connection.iter(name=self.name, **kwargs): - data.update(self._get_pointers(data, **kwargs)) - yield self.model_cls(**data) - - def update(self, entry_id, **kwargs): - """ - Updates and entry in storage. - - :param str entry_id: the id of the table/document. - :param kwargs: the fields to update. - :return: - """ - with self.driver as connection: - connection.update( - name=self.name, - entry_id=entry_id, - **kwargs - ) - - def _get_pointers(self, data, **kwargs): - pointers = {} - for field, schema in self.pointer_mapping.items(): - if field.is_iter: - pointers[field.name] = [ - schema.get(entry_id=pointer_id, **kwargs) - for pointer_id in data[field.name] - if pointer_id] - elif data[field.name]: - pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs) - return pointers - - def _store_pointers(self, data, **kwargs): - pointers = {} - for field, model_api in self.pointer_mapping.items(): - if field.is_iter: - pointers[field.name] = [] - for iter_entity in data[field.name]: - pointers[field.name].append(iter_entity.id) - model_api.store(iter_entity, **kwargs) - else: - pointers[field.name] = data[field.name].id - model_api.store(data[field.name], **kwargs) - return pointers - - def _delete_pointers(self, entry, **kwargs): - for field, schema in self.pointer_mapping.items(): - if field.is_iter: - for iter_entry in getattr(entry, field.name): - schema.delete(iter_entry.id, **kwargs) - else: - schema.delete(getattr(entry, field.name).id, **kwargs) - - -class ResourceApi(object): - """ - Managing the resource in the storage, using the driver. - - :param basestring name: the name of the resource. - :param ResourceDriver driver: the driver which supports this resource in the storage. - """ - def __init__(self, driver, resource_name): - """ - Managing the resources in the storage, using the driver. - - :param ResourceDriver driver: the driver which supports this model in the storage. - :param basestring resource_name: the type of the entry this resourceAPI manages. - """ - assert isinstance(driver, ResourceDriver) - self.driver = driver - self.resource_name = resource_name - - def __repr__(self): - return '{name}(driver={self.driver}, resource={self.resource_name})'.format( - name=self.__class__.__name__, self=self) - - def create(self): - """ - Create the resource dir in the storage. - """ - with self.driver as connection: - connection.create(self.resource_name) - - def data(self, entry_id, path=None, **kwargs): - """ - Retrieve the content of a storage resource. - - :param basestring entry_id: the id of the entry. - :param basestring path: path of the resource on the storage. - :param kwargs: resources to be passed to the driver.. - :return the content of a single file: - """ - with self.driver as connection: - return connection.data( - entry_type=self.resource_name, - entry_id=entry_id, - path=path, - **kwargs) - - def download(self, entry_id, destination, path=None, **kwargs): - """ - Download a file/dir from the resource storage. - - :param basestring entry_id: the id of the entry. - :param basestring destination: the destination of the file/dir. - :param basestring path: path of the resource on the storage. - """ - with self.driver as connection: - connection.download( - entry_type=self.resource_name, - entry_id=entry_id, - destination=destination, - path=path, - **kwargs) - - def upload(self, entry_id, source, path=None, **kwargs): - """ - Upload a file/dir from the resource storage. - - :param basestring entry_id: the id of the entry. - :param basestring source: the source path of the file to upload. - :param basestring path: the destination of the file, relative to the root dir - of the resource - """ - with self.driver as connection: - connection.upload( - entry_type=self.resource_name, - entry_id=entry_id, - source=source, - path=path, - **kwargs) - - -def generate_lower_name(model_cls): - """ - Generates the name of the class from the class object. e.g. SomeClass -> some_class - :param model_cls: the class to evaluate. - :return: lower name - :rtype: basestring - """ - return ''.join( - character if character.islower() else '_{0}'.format(character.lower()) - for character in model_cls.__name__)[1:] - - -class ResourceStorage(Storage): - """ - Managing the resource storage. - """ - def __init__(self, driver, resources=(), **kwargs): - """ - Simple storage client api for Aria applications. - The storage instance defines the tables/documents/code api. - - :param ResourceDriver driver: resource storage driver - :param resources: the resources to register. - """ - assert isinstance(driver, ResourceDriver) - super(ResourceStorage, self).__init__(driver, resources, **kwargs) - - def register(self, resource): - """ - Registers the resource type in the resource storage manager. - :param resource: the resource to register. - """ - self.registered[resource] = ResourceApi(self.driver, resource_name=resource) - - def __getattr__(self, resource): - """ - getattr is a shortcut to simple api - - for Example: - >> storage = ResourceStorage(driver=FileSystemResourceDriver('/tmp')) - >> blueprint_resources = storage.blueprint - >> blueprint_resources.download(blueprint_id, destination='~/blueprint/') - - :param str resource: resource name to download - :return: a storage object that mapped to the resource name - :rtype: ResourceApi - """ - return super(ResourceStorage, self).__getattr__(resource) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/api.py ---------------------------------------------------------------------- diff --git a/aria/storage/api.py b/aria/storage/api.py new file mode 100644 index 0000000..d6fc3b8 --- /dev/null +++ b/aria/storage/api.py @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +General storage API +""" + + +class StorageAPI(object): + """ + General storage Base API + """ + def create(self, **kwargs): + """ + Create a storage API. + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract create method') + + +class ModelAPI(StorageAPI): + """ + A Base object for the model. + """ + def __init__(self, model_cls, name=None, **kwargs): + """ + Base model API + + :param model_cls: the representing class of the model + :param str name: the name of the model + :param kwargs: + """ + super(ModelAPI, self).__init__(**kwargs) + self._model_cls = model_cls + self._name = name or generate_lower_name(model_cls) + + @property + def name(self): + """ + The name of the class + :return: name of the class + """ + return self._name + + @property + def model_cls(self): + """ + The class represting the model + :return: + """ + return self._model_cls + + def get(self, entry_id, filters=None, **kwargs): + """ + Get entry from storage. + + :param entry_id: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract get method') + + def put(self, entry, **kwargs): + """ + Store entry in storage + + :param entry: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract store method') + + def delete(self, entry_id, **kwargs): + """ + Delete entry from storage. + + :param entry_id: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract delete method') + + def __iter__(self): + return self.iter() + + def iter(self, **kwargs): + """ + Iter over the entries in storage. + + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract iter method') + + def update(self, entry, **kwargs): + """ + Update entry in storage. + + :param entry: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract update method') + + +class ResourceAPI(StorageAPI): + """ + A Base object for the resource. + """ + def __init__(self, name): + """ + Base resource API + :param str name: the resource type + """ + self._name = name + + @property + def name(self): + """ + The name of the resource + :return: + """ + return self._name + + def read(self, entry_id, path=None, **kwargs): + """ + Get a bytesteam from the storage. + + :param entry_id: + :param path: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract data method') + + def download(self, entry_id, destination, path=None, **kwargs): + """ + Download a resource from the storage. + + :param entry_id: + :param destination: + :param path: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract download method') + + def upload(self, entry_id, source, path=None, **kwargs): + """ + Upload a resource to the storage. + + :param entry_id: + :param source: + :param path: + :param kwargs: + :return: + """ + raise NotImplementedError('Subclass must implement abstract upload method') + + +def generate_lower_name(model_cls): + """ + Generates the name of the class from the class object. e.g. SomeClass -> some_class + :param model_cls: the class to evaluate. + :return: lower name + :rtype: basestring + """ + return ''.join( + character if character.islower() else '_{0}'.format(character.lower()) + for character in model_cls.__name__)[1:] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py new file mode 100644 index 0000000..a5d3210 --- /dev/null +++ b/aria/storage/core.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Aria's storage Sub-Package +Path: aria.storage + +Storage package is a generic abstraction over different storage types. +We define this abstraction with the following components: + +1. storage: simple mapi to use +2. driver: implementation of the database client mapi. +3. model: defines the structure of the table/document. +4. field: defines a field/item in the model. + +API: + * application_storage_factory - function, default Aria storage factory. + * Storage - class, simple storage mapi. + * models - module, default Aria standard models. + * structures - module, default Aria structures - holds the base model, + and different fields types. + * Model - class, abstract model implementation. + * Field - class, base field implementation. + * IterField - class, base iterable field implementation. + * drivers - module, a pool of Aria standard drivers. + * StorageDriver - class, abstract model implementation. +""" + +from aria.logger import LoggerMixin +from . import api as storage_api + +__all__ = ( + 'Storage', + 'ModelStorage', + 'ResourceStorage' +) + + +class Storage(LoggerMixin): + """ + Represents the storage + """ + def __init__(self, api_cls, api_kwargs=None, items=(), **kwargs): + self._api_kwargs = api_kwargs or {} + super(Storage, self).__init__(**kwargs) + self.api = api_cls + self.registered = {} + for item in items: + self.register(item) + self.logger.debug('{name} object is ready: {0!r}'.format( + self, name=self.__class__.__name__)) + + def __repr__(self): + return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self) + + def __getattr__(self, item): + try: + return self.registered[item] + except KeyError: + return super(Storage, self).__getattribute__(item) + + def register(self, entry): + """ + Register the entry to the storage + :param name: + :return: + """ + raise NotImplementedError('Subclass must implement abstract register method') + + +class ResourceStorage(Storage): + """ + Represents resource storage. + """ + def register(self, name): + """ + Register the resource type to resource storage. + :param name: + :return: + """ + self.registered[name] = self.api(name=name, **self._api_kwargs) + self.registered[name].create() + self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self)) + + +class ModelStorage(Storage): + """ + Represents model storage. + """ + def register(self, model_cls): + """ + Register the model into the model storage. + :param model_cls: the model to register. + :return: + """ + model_name = storage_api.generate_lower_name(model_cls) + if model_name in self.registered: + self.logger.debug('{name} in already storage {self!r}'.format(name=model_name, + self=self)) + return + self.registered[model_name] = self.api(name=model_name, + model_cls=model_cls, + **self._api_kwargs) + self.registered[model_name].create() + self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self)) + + def drop(self): + """ + Drop all the tables from the model. + :return: + """ + for mapi in self.registered.values(): + mapi.drop() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/drivers.py ---------------------------------------------------------------------- diff --git a/aria/storage/drivers.py b/aria/storage/drivers.py deleted file mode 100644 index 1f96956..0000000 --- a/aria/storage/drivers.py +++ /dev/null @@ -1,416 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.drivers module -Path: aria.storage.driver - -drivers module holds a generic abstract implementation of drivers. - -classes: - * Driver - abstract storage driver implementation. - * ModelDriver - abstract model base storage driver. - * ResourceDriver - abstract resource base storage driver. - * FileSystemModelDriver - file system implementation for model storage driver. - * FileSystemResourceDriver - file system implementation for resource storage driver. -""" - -import distutils.dir_util # pylint: disable=no-name-in-module, import-error -import os -import shutil -from functools import partial -from multiprocessing import RLock - -import jsonpickle - -from ..logger import LoggerMixin -from .exceptions import StorageError - -__all__ = ( - 'ModelDriver', - 'FileSystemModelDriver', - 'ResourceDriver', - 'FileSystemResourceDriver', -) - - -class Driver(LoggerMixin): - """ - Driver: storage driver context manager - abstract driver implementation. - In the implementation level, It is a good practice to raise StorageError on Errors. - """ - - def __enter__(self): - """ - Context manager entry method, executes connect. - :return: context manager instance - :rtype: Driver - """ - self.connect() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """ - Context manager exit method, executes disconnect. - """ - self.disconnect() - if not exc_type: - return - # self.logger.debug( - # '{name} had an error'.format(name=self.__class__.__name__), - # exc_info=(exc_type, exc_val, exc_tb)) - if StorageError in exc_type.mro(): - return - raise StorageError('Exception had occurred, {type}: {message}'.format( - type=exc_type, message=str(exc_val))) - - def connect(self): - """ - Open storage connection. - In some cases, This method can get the connection from a connection pool. - """ - pass - - def disconnect(self): - """ - Close storage connection. - In some cases, This method can release the connection to the connection pool. - """ - pass - - def create(self, name, *args, **kwargs): - """ - Create table/document in storage by name. - :param str name: name of table/document in storage. - """ - pass - - -class ModelDriver(Driver): - """ - ModelDriver context manager. - Base Driver for Model based storage. - """ - - def get(self, name, entry_id, **kwargs): - """ - Getter from storage. - :param str name: name of table/document in storage. - :param str entry_id: id of the document to get from storage. - :return: value of entity from the storage. - """ - raise NotImplementedError('Subclass must implement abstract get method') - - def delete(self, name, entry_id, **kwargs): - """ - Delete from storage. - :param str name: name of table/document in storage. - :param str entry_id: id of the entity to delete from storage. - :param dict kwargs: extra kwargs if needed. - """ - raise NotImplementedError('Subclass must implement abstract delete method') - - def store(self, name, entry_id, entry, **kwargs): - """ - Setter to storage. - :param str name: name of table/document in storage. - :param str entry_id: id of the entity to store in the storage. - :param dict entry: content to store. - """ - raise NotImplementedError('Subclass must implement abstract store method') - - def iter(self, name, **kwargs): - """ - Generator over the entries of table/document in storage. - :param str name: name of table/document/file in storage to iter over. - """ - raise NotImplementedError('Subclass must implement abstract iter method') - - def update(self, name, entry_id, **kwargs): - """ - Updates and entry in storage. - - :param str name: name of table/document in storage. - :param str entry_id: id of the document to get from storage. - :param kwargs: the fields to update. - :return: - """ - raise NotImplementedError('Subclass must implement abstract store method') - - -class ResourceDriver(Driver): - """ - ResourceDriver context manager. - Base Driver for Resource based storage. - - Resource storage structure is a file system base. - <resource root directory>/<resource_name>/<entry_id>/<entry> - entry: can be one single file or multiple files and directories. - """ - - def data(self, entry_type, entry_id, path=None, **kwargs): - """ - Get the binary data from a file in a resource entry. - If the entry is a single file no path needed, - If the entry contain number of files the path will gide to the relevant file. - - resource path: - <resource root directory>/<name>/<entry_id>/<path> - - :param basestring entry_type: resource name. - :param basestring entry_id: id of the entity to resource in the storage. - :param basestring path: path to resource relative to entry_id folder in the storage. - :return: entry file object. - :rtype: bytes - """ - raise NotImplementedError('Subclass must implement abstract get method') - - def download(self, entry_type, entry_id, destination, path=None, **kwargs): - """ - Download the resource to a destination. - Like data method bat this method isn't returning data, - Instead it create a new file in local file system. - - resource path: - <resource root directory>/<name>/<entry_id>/<path> - copy to: - /<destination> - destination can be file or directory - - :param basestring entry_type: resource name. - :param basestring entry_id: id of the entity to resource in the storage. - :param basestring destination: path in local file system to download to. - :param basestring path: path to resource relative to entry_id folder in the storage. - """ - raise NotImplementedError('Subclass must implement abstract get method') - - def upload(self, entry_type, entry_id, source, path=None, **kwargs): - """ - Upload the resource from source. - source can be file or directory with files. - - copy from: - /<source> - to resource path: - <resource root directory>/<name>/<entry_id>/<path> - - :param basestring entry_type: resource name. - :param basestring entry_id: id of the entity to resource in the storage. - :param basestring source: source can be file or directory with files. - :param basestring path: path to resource relative to entry_id folder in the storage. - """ - raise NotImplementedError('Subclass must implement abstract get method') - - -class BaseFileSystemDriver(Driver): - """ - Base class which handles storage on the file system. - """ - def __init__(self, *args, **kwargs): - super(BaseFileSystemDriver, self).__init__(*args, **kwargs) - self._lock = RLock() - - def connect(self): - self._lock.acquire() - - def disconnect(self): - self._lock.release() - - def __getstate__(self): - obj_dict = super(BaseFileSystemDriver, self).__getstate__() - del obj_dict['_lock'] - return obj_dict - - def __setstate__(self, obj_dict): - super(BaseFileSystemDriver, self).__setstate__(obj_dict) - vars(self).update(_lock=RLock(), **obj_dict) - - -class FileSystemModelDriver(ModelDriver, BaseFileSystemDriver): - """ - FileSystemModelDriver context manager. - """ - - def __init__(self, directory, **kwargs): - """ - File system implementation for storage driver. - :param str directory: root dir for storage. - """ - super(FileSystemModelDriver, self).__init__(**kwargs) - self.directory = directory - - self._join_path = partial(os.path.join, self.directory) - - def __repr__(self): - return '{cls.__name__}(directory={self.directory})'.format( - cls=self.__class__, self=self) - - def create(self, name): - """ - Create directory in storage by path. - tries to create the root directory as well. - :param str name: path of file in storage. - """ - try: - os.makedirs(self.directory) - except (OSError, IOError): - pass - os.makedirs(self._join_path(name)) - - def get(self, name, entry_id, **kwargs): - """ - Getter from storage. - :param str name: name of directory in storage. - :param str entry_id: id of the file to get from storage. - :return: value of file from storage. - :rtype: dict - """ - with open(self._join_path(name, entry_id)) as file_obj: - return jsonpickle.loads(file_obj.read()) - - def store(self, name, entry_id, entry, **kwargs): - """ - Delete from storage. - :param str name: name of directory in storage. - :param str entry_id: id of the file to delete from storage. - """ - with open(self._join_path(name, entry_id), 'w') as file_obj: - file_obj.write(jsonpickle.dumps(entry)) - - def delete(self, name, entry_id, **kwargs): - """ - Delete from storage. - :param str name: name of directory in storage. - :param str entry_id: id of the file to delete from storage. - """ - os.remove(self._join_path(name, entry_id)) - - def iter(self, name, filters=None, **kwargs): - """ - Generator over the entries of directory in storage. - :param str name: name of directory in storage to iter over. - :param dict filters: filters for query - """ - filters = filters or {} - - for entry_id in os.listdir(self._join_path(name)): - value = self.get(name, entry_id=entry_id) - for filter_name, filter_value in filters.items(): - if value.get(filter_name) != filter_value: - break - else: - yield value - - def update(self, name, entry_id, **kwargs): - """ - Updates and entry in storage. - - :param str name: name of table/document in storage. - :param str entry_id: id of the document to get from storage. - :param kwargs: the fields to update. - :return: - """ - entry_dict = self.get(name, entry_id) - entry_dict.update(**kwargs) - self.store(name, entry_id, entry_dict) - - -class FileSystemResourceDriver(ResourceDriver, BaseFileSystemDriver): - """ - FileSystemResourceDriver context manager. - """ - - def __init__(self, directory, **kwargs): - """ - File system implementation for storage driver. - :param str directory: root dir for storage. - """ - super(FileSystemResourceDriver, self).__init__(**kwargs) - self.directory = directory - self._join_path = partial(os.path.join, self.directory) - - def __repr__(self): - return '{cls.__name__}(directory={self.directory})'.format( - cls=self.__class__, self=self) - - def create(self, name): - """ - Create directory in storage by path. - tries to create the root directory as well. - :param basestring name: path of file in storage. - """ - try: - os.makedirs(self.directory) - except (OSError, IOError): - pass - os.makedirs(self._join_path(name)) - - def data(self, entry_type, entry_id, path=None): - """ - Retrieve the content of a file system storage resource. - - :param basestring entry_type: the type of the entry. - :param basestring entry_id: the id of the entry. - :param basestring path: a path to a specific resource. - :return: the content of the file - :rtype: bytes - """ - resource_relative_path = os.path.join(entry_type, entry_id, path or '') - resource = os.path.join(self.directory, resource_relative_path) - if not os.path.exists(resource): - raise StorageError("Resource {0} does not exist".format(resource_relative_path)) - if not os.path.isfile(resource): - resources = os.listdir(resource) - if len(resources) != 1: - raise StorageError('No resource in path: {0}'.format(resource)) - resource = os.path.join(resource, resources[0]) - with open(resource, 'rb') as resource_file: - return resource_file.read() - - def download(self, entry_type, entry_id, destination, path=None): - """ - Download a specific file or dir from the file system resource storage. - - :param basestring entry_type: the name of the entry. - :param basestring entry_id: the id of the entry - :param basestring destination: the destination of the files. - :param basestring path: a path on the remote machine relative to the root of the entry. - """ - resource_relative_path = os.path.join(entry_type, entry_id, path or '') - resource = os.path.join(self.directory, resource_relative_path) - if not os.path.exists(resource): - raise StorageError("Resource {0} does not exist".format(resource_relative_path)) - if os.path.isfile(resource): - shutil.copy2(resource, destination) - else: - distutils.dir_util.copy_tree(resource, destination) # pylint: disable=no-member - - def upload(self, entry_type, entry_id, source, path=None): - """ - Uploads a specific file or dir to the file system resource storage. - - :param basestring entry_type: the name of the entry. - :param basestring entry_id: the id of the entry - :param source: the source of the files to upload. - :param path: the destination of the file/s relative to the entry root dir. - """ - resource_directory = os.path.join(self.directory, entry_type, entry_id) - if not os.path.exists(resource_directory): - os.makedirs(resource_directory) - destination = os.path.join(resource_directory, path or '') - if os.path.isfile(source): - shutil.copy2(source, destination) - else: - distutils.dir_util.copy_tree(source, destination) # pylint: disable=no-member http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py index 22dfc50..f982f63 100644 --- a/aria/storage/exceptions.py +++ b/aria/storage/exceptions.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Storage based exceptions +""" from .. import exceptions http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c6c92ae5/aria/storage/filesystem_rapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/filesystem_rapi.py b/aria/storage/filesystem_rapi.py new file mode 100644 index 0000000..f810f58 --- /dev/null +++ b/aria/storage/filesystem_rapi.py @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQLalchemy based RAPI +""" +import os +import shutil +from contextlib import contextmanager +from functools import partial +from distutils import dir_util # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module +from multiprocessing import RLock + +from aria.storage import ( + api, + exceptions +) + + +class FileSystemResourceAPI(api.ResourceAPI): + """ + File system resource storage. + """ + + def __init__(self, directory, **kwargs): + """ + File system implementation for storage api. + :param str directory: root dir for storage. + """ + super(FileSystemResourceAPI, self).__init__(**kwargs) + self.directory = directory + self.base_path = os.path.join(self.directory, self.name) + self._join_path = partial(os.path.join, self.base_path) + self._lock = RLock() + + @contextmanager + def connect(self): + """ + Established a connection and destroys it after use. + :return: + """ + try: + self._establish_connection() + yield self + except BaseException as e: + raise exceptions.StorageError(str(e)) + finally: + self._destroy_connection() + + def _establish_connection(self): + """ + Establish a conenction. used in the 'connect' contextmanager. + :return: + """ + self._lock.acquire() + + + def _destroy_connection(self): + """ + Destroy a connection. used in the 'connect' contextmanager. + :return: + """ + self._lock.release() + + def __repr__(self): + return '{cls.__name__}(directory={self.directory})'.format( + cls=self.__class__, self=self) + + def create(self, **kwargs): + """ + Create directory in storage by path. + tries to create the root directory as well. + :param str name: path of file in storage. + """ + try: + os.makedirs(self.directory) + except (OSError, IOError): + pass + os.makedirs(self.base_path) + + def read(self, entry_id, path=None, **_): + """ + Retrieve the content of a file system storage resource. + + :param str entry_type: the type of the entry. + :param str entry_id: the id of the entry. + :param str path: a path to a specific resource. + :return: the content of the file + :rtype: bytes + """ + resource_relative_path = os.path.join(self.name, entry_id, path or '') + resource = os.path.join(self.directory, resource_relative_path) + if not os.path.exists(resource): + raise exceptions.StorageError("Resource {0} does not exist". + format(resource_relative_path)) + if not os.path.isfile(resource): + resources = os.listdir(resource) + if len(resources) != 1: + raise exceptions.StorageError('No resource in path: {0}'.format(resource)) + resource = os.path.join(resource, resources[0]) + with open(resource, 'rb') as resource_file: + return resource_file.read() + + def download(self, entry_id, destination, path=None, **_): + """ + Download a specific file or dir from the file system resource storage. + + :param str entry_type: the name of the entry. + :param str entry_id: the id of the entry + :param str destination: the destination of the files. + :param str path: a path on the remote machine relative to the root of the entry. + """ + resource_relative_path = os.path.join(self.name, entry_id, path or '') + resource = os.path.join(self.directory, resource_relative_path) + if not os.path.exists(resource): + raise exceptions.StorageError("Resource {0} does not exist". + format(resource_relative_path)) + if os.path.isfile(resource): + shutil.copy2(resource, destination) + else: + dir_util.copy_tree(resource, destination) # pylint: disable=no-member + + def upload(self, entry_id, source, path=None, **_): + """ + Uploads a specific file or dir to the file system resource storage. + + :param str entry_type: the name of the entry. + :param str entry_id: the id of the entry + :param source: the source of the files to upload. + :param path: the destination of the file/s relative to the entry root dir. + """ + resource_directory = os.path.join(self.directory, self.name, entry_id) + if not os.path.exists(resource_directory): + os.makedirs(resource_directory) + destination = os.path.join(resource_directory, path or '') + if os.path.isfile(source): + shutil.copy2(source, destination) + else: + dir_util.copy_tree(source, destination) # pylint: disable=no-member
