Aria 21 reorder repository sturcutre Additional changes: - some importing issues in the parser
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/17510b02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/17510b02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/17510b02 Branch: refs/heads/ARIA-21-reorder-repository-sturcutre Commit: 17510b02ae7957855d8fde16c8af699373154115 Parents: 8ee1470 Author: mxmrlv <mxm...@gmail.com> Authored: Tue Nov 15 19:55:00 2016 +0200 Committer: mxmrlv <mxm...@gmail.com> Committed: Wed Nov 16 12:52:34 2016 +0200 ---------------------------------------------------------------------- aria/__init__.py | 17 +- aria/cli/commands.py | 22 +- aria/context/__init__.py | 21 - aria/context/common.py | 147 ---- aria/context/operation.py | 127 ---- aria/context/toolbelt.py | 75 -- aria/context/workflow.py | 119 --- aria/decorators.py | 73 -- aria/events/__init__.py | 57 -- aria/events/builtin_event_handler.py | 123 --- aria/events/workflow_engine_event_handler.py | 74 -- aria/exceptions.py | 9 - aria/orchestrator/__init__.py | 23 + aria/orchestrator/context/__init__.py | 21 + aria/orchestrator/context/common.py | 145 ++++ aria/orchestrator/context/exceptions.py | 23 + aria/orchestrator/context/operation.py | 127 ++++ aria/orchestrator/context/toolbelt.py | 75 ++ aria/orchestrator/context/workflow.py | 113 +++ aria/orchestrator/decorators.py | 74 ++ aria/orchestrator/events/__init__.py | 57 ++ .../events/builtin_event_handler.py | 123 +++ .../events/workflow_engine_event_handler.py | 74 ++ aria/orchestrator/exceptions.py | 20 + aria/orchestrator/workflows/__init__.py | 14 + aria/orchestrator/workflows/api/__init__.py | 20 + aria/orchestrator/workflows/api/task.py | 171 +++++ aria/orchestrator/workflows/api/task_graph.py | 290 ++++++++ aria/orchestrator/workflows/builtin/__init__.py | 31 + .../workflows/builtin/execute_operation.py | 104 +++ aria/orchestrator/workflows/builtin/heal.py | 174 +++++ aria/orchestrator/workflows/builtin/install.py | 53 ++ .../orchestrator/workflows/builtin/uninstall.py | 52 ++ .../orchestrator/workflows/builtin/workflows.py | 215 ++++++ aria/orchestrator/workflows/core/__init__.py | 20 + aria/orchestrator/workflows/core/engine.py | 116 +++ aria/orchestrator/workflows/core/task.py | 243 ++++++ aria/orchestrator/workflows/core/translation.py | 106 +++ aria/orchestrator/workflows/exceptions.py | 71 ++ .../orchestrator/workflows/executor/__init__.py | 22 + aria/orchestrator/workflows/executor/base.py | 54 ++ .../orchestrator/workflows/executor/blocking.py | 36 + aria/orchestrator/workflows/executor/celery.py | 97 +++ .../workflows/executor/multiprocess.py | 98 +++ aria/orchestrator/workflows/executor/thread.py | 65 ++ aria/parser/modeling/context.py | 5 +- aria/parser/modeling/elements.py | 6 +- aria/parser/modeling/instance_elements.py | 6 +- aria/parser/modeling/model_elements.py | 7 +- aria/parser/modeling/types.py | 6 +- aria/parser/modeling/utils.py | 7 +- aria/parser/presentation/fields.py | 5 +- aria/parser/reading/json.py | 5 +- aria/parser/reading/yaml.py | 5 +- aria/parser/specification.py | 6 +- aria/parser/tools/rest.py | 6 +- aria/parser/utils/caching.py | 6 +- aria/parser/utils/collections.py | 6 +- aria/parser/utils/daemon.py | 6 +- aria/parser/utils/formatting.py | 6 +- aria/parser/utils/rest_server.py | 6 +- aria/parser/validation/issue.py | 5 +- aria/storage/__init__.py | 2 +- aria/storage/drivers.py | 6 +- aria/storage/exceptions.py | 23 + aria/storage/structures.py | 5 +- aria/tools/__init__.py | 6 + aria/tools/application.py | 4 +- aria/tools/process.py | 2 +- aria/workflows/__init__.py | 14 - aria/workflows/api/__init__.py | 20 - aria/workflows/api/task.py | 172 ----- aria/workflows/api/task_graph.py | 290 -------- aria/workflows/builtin/__init__.py | 31 - aria/workflows/builtin/execute_operation.py | 104 --- aria/workflows/builtin/heal.py | 174 ----- aria/workflows/builtin/install.py | 53 -- aria/workflows/builtin/uninstall.py | 52 -- aria/workflows/builtin/workflows.py | 215 ------ aria/workflows/core/__init__.py | 20 - aria/workflows/core/engine.py | 114 --- aria/workflows/core/task.py | 242 ------ aria/workflows/core/translation.py | 106 --- aria/workflows/exceptions.py | 70 -- aria/workflows/executor/__init__.py | 21 - aria/workflows/executor/base.py | 54 -- aria/workflows/executor/blocking.py | 36 - aria/workflows/executor/celery.py | 97 --- aria/workflows/executor/multiprocess.py | 98 --- aria/workflows/executor/thread.py | 65 -- .../simple_v1_0/data_types.py | 5 +- .../simple_v1_0/modeling/artifacts.py | 5 +- .../simple_v1_0/modeling/capabilities.py | 5 +- .../simple_v1_0/modeling/data_types.py | 5 +- .../simple_v1_0/modeling/interfaces.py | 5 +- .../simple_v1_0/modeling/properties.py | 5 +- .../simple_v1_0/modeling/requirements.py | 6 +- tests/context/__init__.py | 35 - tests/context/test_operation.py | 156 ---- tests/context/test_toolbelt.py | 171 ----- tests/context/test_workflow.py | 62 -- tests/events/__init__.py | 14 - tests/events/test_builtin_event_handlers.py | 14 - .../test_workflow_enginge_event_handlers.py | 0 tests/mock/context.py | 3 +- tests/orchestrator/__init__.py | 14 + tests/orchestrator/context/__init__.py | 33 + tests/orchestrator/context/test_operation.py | 156 ++++ tests/orchestrator/context/test_toolbelt.py | 172 +++++ tests/orchestrator/context/test_workflow.py | 63 ++ tests/orchestrator/events/__init__.py | 14 + .../events/test_builtin_event_handlers.py | 14 + .../test_workflow_enginge_event_handlers.py | 0 tests/orchestrator/workflows/__init__.py | 16 + tests/orchestrator/workflows/api/__init__.py | 14 + tests/orchestrator/workflows/api/test_task.py | 150 ++++ .../workflows/api/test_task_graph.py | 745 +++++++++++++++++++ .../orchestrator/workflows/builtin/__init__.py | 86 +++ .../workflows/builtin/test_execute_operation.py | 51 ++ .../orchestrator/workflows/builtin/test_heal.py | 88 +++ .../workflows/builtin/test_install.py | 39 + .../workflows/builtin/test_uninstall.py | 39 + tests/orchestrator/workflows/core/__init__.py | 14 + .../orchestrator/workflows/core/test_engine.py | 435 +++++++++++ tests/orchestrator/workflows/core/test_task.py | 113 +++ .../test_task_graph_into_exececution_graph.py | 105 +++ .../orchestrator/workflows/executor/__init__.py | 14 + .../workflows/executor/test_executor.py | 141 ++++ tests/storage/test_drivers.py | 2 +- tests/storage/test_model_storage.py | 2 +- tests/storage/test_models.py | 2 +- tests/storage/test_models_api.py | 2 +- tests/storage/test_resource_storage.py | 2 +- tests/test_logger.py | 1 - tests/workflows/__init__.py | 16 - tests/workflows/api/__init__.py | 14 - tests/workflows/api/test_task.py | 150 ---- tests/workflows/api/test_task_graph.py | 745 ------------------- tests/workflows/builtin/__init__.py | 86 --- .../workflows/builtin/test_execute_operation.py | 51 -- tests/workflows/builtin/test_heal.py | 88 --- tests/workflows/builtin/test_install.py | 39 - tests/workflows/builtin/test_uninstall.py | 39 - tests/workflows/core/__init__.py | 14 - tests/workflows/core/test_engine.py | 433 ----------- tests/workflows/core/test_task.py | 113 --- .../test_task_graph_into_exececution_graph.py | 105 --- tests/workflows/executor/__init__.py | 14 - tests/workflows/executor/test_executor.py | 141 ---- 149 files changed, 5605 insertions(+), 5447 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index eca7b9b..b2d1157 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -18,15 +18,16 @@ Aria top level package """ from .VERSION import version as __version__ -from .storage.drivers import ( - ResourceDriver, - ModelDriver, - FileSystemModelDriver, - FileSystemResourceDriver, -) -from .storage import ModelStorage, ResourceStorage, models -from .decorators import workflow, operation +from .orchestrator.decorators import workflow, operation +from .storage import ModelStorage, ResourceStorage, models, ModelDriver, ResourceDriver +from . import ( + tools, + parser, + storage, + orchestrator, + cli +) __all__ = ( '__version__', 'workflow', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index d3698fd..2fff4f0 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -23,16 +23,22 @@ import sys from glob import glob from importlib import import_module +from dsl_parser.parser import parse_from_path +from dsl_parser.tasks import prepare_deployment_plan from yaml import safe_load, YAMLError from aria import application_model_storage, application_resource_storage +from aria.orchestrator.context.workflow import WorkflowContext from aria.logger import LoggerMixin from aria.storage import FileSystemModelDriver, FileSystemResourceDriver from aria.tools.application import StorageManager -from aria.context.workflow import WorkflowContext -from aria.workflows.core.engine import Engine -from aria.workflows.executor.thread import ThreadExecutor - +from aria.orchestrator.workflows.core.engine import Engine +from aria.orchestrator.workflows.executor.thread import ThreadExecutor +from .exceptions import ( + AriaCliFormatInputsError, + AriaCliYAMLInputsError, + AriaCliInvalidInputsError +) from .storage import ( local_resource_storage, create_local_storage, @@ -42,16 +48,8 @@ from .storage import ( local_storage, ) -from .exceptions import ( - AriaCliFormatInputsError, - AriaCliYAMLInputsError, - AriaCliInvalidInputsError -) ####################################### -from dsl_parser.parser import parse_from_path -from dsl_parser.tasks import prepare_deployment_plan -####################################### class BaseCommand(LoggerMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/context/__init__.py ---------------------------------------------------------------------- diff --git a/aria/context/__init__.py b/aria/context/__init__.py deleted file mode 100644 index ad89b13..0000000 --- a/aria/context/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Provides contexts to workflow and operation -""" - -from . import workflow, operation -from .toolbelt import toolbelt http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/context/common.py ---------------------------------------------------------------------- diff --git a/aria/context/common.py b/aria/context/common.py deleted file mode 100644 index 6e9b86a..0000000 --- a/aria/context/common.py +++ /dev/null @@ -1,147 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -A common context for both workflow and operation -""" -from uuid import uuid4 - -from .. import ( - logger, - exceptions, -) -from ..tools.lru_cache import lru_cache - - -class BaseContext(logger.LoggerMixin): - """ - Base context object for workflow and operation - """ - - def __init__( - self, - name, - model_storage, - resource_storage, - deployment_id, - workflow_id, - execution_id=None, - task_max_attempts=1, - task_retry_interval=0, - task_ignore_failure=False, - **kwargs): - super(BaseContext, self).__init__(**kwargs) - self._name = name - self._id = str(uuid4()) - self._model = model_storage - self._resource = resource_storage - self._deployment_id = deployment_id - self._workflow_id = workflow_id - self._execution_id = execution_id or str(uuid4()) - self._task_max_attempts = task_max_attempts - self._task_retry_interval = task_retry_interval - self._task_ignore_failure = task_ignore_failure - - def __repr__(self): - return ( - '{name}(name={self.name}, ' - 'deployment_id={self._deployment_id}, ' - 'workflow_id={self._workflow_id}, ' - 'execution_id={self._execution_id})' - .format(name=self.__class__.__name__, self=self)) - - @property - def model(self): - """ - Access to the model storage - :return: - """ - return self._model - - @property - def resource(self): - """ - Access to the resource storage - :return: - """ - return self._resource - - @property - @lru_cache() - def blueprint(self): - """ - The blueprint model - """ - return self.model.blueprint.get(self.deployment.blueprint_id) - - @property - @lru_cache() - def deployment(self): - """ - The deployment model - """ - return self.model.deployment.get(self._deployment_id) - - @property - def execution(self): - """ - The execution model - """ - return self.model.execution.get(self._execution_id) - - @execution.setter - def execution(self, value): - """ - Store the execution in the model storage - """ - self.model.execution.store(value) - - @property - def name(self): - """ - The operation name - :return: - """ - return self._name - - @property - def id(self): - """ - The operation id - :return: - """ - return self._id - - def download_resource(self, destination, path=None): - """ - Download a blueprint resource from the resource storage - """ - try: - return self.resource.deployment.download(entry_id=self.deployment.id, - destination=destination, - path=path) - except exceptions.StorageError: - return self.resource.blueprint.download(entry_id=self.blueprint.id, - destination=destination, - path=path) - - @lru_cache() - def get_resource(self, path=None): - """ - Read a deployment resource as string from the resource storage - """ - try: - return self.resource.deployment.data(entry_id=self.deployment.id, path=path) - except exceptions.StorageError: - return self.resource.blueprint.data(entry_id=self.blueprint.id, path=path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/context/operation.py b/aria/context/operation.py deleted file mode 100644 index bf3686d..0000000 --- a/aria/context/operation.py +++ /dev/null @@ -1,127 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow and operation contexts -""" - - -from .common import BaseContext - - -class BaseOperationContext(BaseContext): - """ - Context object used during operation creation and execution - """ - - def __init__(self, name, workflow_context, task, **kwargs): - super(BaseOperationContext, self).__init__( - name=name, - model_storage=workflow_context.model, - resource_storage=workflow_context.resource, - deployment_id=workflow_context._deployment_id, - workflow_id=workflow_context._workflow_id, - execution_id=workflow_context._execution_id, - **kwargs) - self._task_model = task - self._actor = self.task.actor - - def __repr__(self): - details = 'operation_mapping={task.operation_mapping}; ' \ - 'operation_inputs={task.inputs}'\ - .format(task=self.task) - return '{name}({0})'.format(details, name=self.name) - - @property - def task(self): - """ - The task in the model storage - :return: Task model - """ - return self._task_model - - -class NodeOperationContext(BaseOperationContext): - """ - Context for node based operations. - """ - @property - def node(self): - """ - the node of the current operation - :return: - """ - return self._actor.node - - @property - def node_instance(self): - """ - The node instance of the current operation - :return: - """ - return self._actor - - -class RelationshipOperationContext(BaseOperationContext): - """ - Context for relationship based operations. - """ - @property - def source_node(self): - """ - The source node - :return: - """ - return self.model.node.get(self.relationship.source_id) - - @property - def source_node_instance(self): - """ - The source node instance - :return: - """ - return self.model.node_instance.get(self.relationship_instance.source_id) - - @property - def target_node(self): - """ - The target node - :return: - """ - return self.model.node.get(self.relationship.target_id) - - @property - def target_node_instance(self): - """ - The target node instance - :return: - """ - return self.model.node_instance.get(self._actor.target_id) - - @property - def relationship(self): - """ - The relationship of the current operation - :return: - """ - return self._actor.relationship - - @property - def relationship_instance(self): - """ - The relationship instance of the current operation - :return: - """ - return self._actor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/context/toolbelt.py b/aria/context/toolbelt.py deleted file mode 100644 index 0aad89c..0000000 --- a/aria/context/toolbelt.py +++ /dev/null @@ -1,75 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Provides with different tools for operations. -""" - -from . import operation - - -class NodeToolBelt(object): - """ - Node operation related tool belt - """ - def __init__(self, operation_context): - self._op_context = operation_context - - @property - def dependent_node_instances(self): - """ - Any node instance which has a relationship to the current node instance. - :return: - """ - assert isinstance(self._op_context, operation.NodeOperationContext) - node_instances = self._op_context.model.node_instance.iter( - filters={'deployment_id': self._op_context.deployment.id} - ) - for node_instance in node_instances: - for relationship_instance in node_instance.relationship_instances: - if relationship_instance.target_id == self._op_context.node_instance.id: - yield node_instance - - @property - def host_ip(self): - """ - The host ip of the current node - :return: - """ - assert isinstance(self._op_context, operation.NodeOperationContext) - host_id = self._op_context._actor.host_id - host_instance = self._op_context.model.node_instance.get(host_id) - return host_instance.runtime_properties.get('ip') - - -class RelationshipToolBelt(object): - """ - Relationship operation related tool belt - """ - def __init__(self, operation_context): - self._op_context = operation_context - - -def toolbelt(operation_context): - """ - Get a toolbelt according to the current operation executor - :param operation_context: - :return: - """ - if isinstance(operation_context, operation.NodeOperationContext): - return NodeToolBelt(operation_context) - elif isinstance(operation_context, operation.RelationshipOperationContext): - return RelationshipToolBelt(operation_context) - else: - raise RuntimeError("Operation context not supported") http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py deleted file mode 100644 index 0495bdc..0000000 --- a/aria/context/workflow.py +++ /dev/null @@ -1,119 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow and operation contexts -""" - -import threading -from contextlib import contextmanager - -from aria import exceptions - -from .common import BaseContext - - -class ContextException(exceptions.AriaError): - """ - Context based exception - """ - pass - - -class WorkflowContext(BaseContext): - """ - Context object used during workflow creation and execution - """ - def __init__(self, parameters=None, *args, **kwargs): - super(WorkflowContext, self).__init__(*args, **kwargs) - self.parameters = parameters or {} - # TODO: execution creation should happen somewhere else - # should be moved there, when such logical place exists - try: - self.model.execution.get(self._execution_id) - except exceptions.StorageError: - self._create_execution() - - def __repr__(self): - return ( - '{name}(deployment_id={self._deployment_id}, ' - 'workflow_id={self._workflow_id}, ' - 'execution_id={self._execution_id})'.format( - name=self.__class__.__name__, self=self)) - - def _create_execution(self): - execution_cls = self.model.execution.model_cls - execution = self.model.execution.model_cls( - id=self._execution_id, - deployment_id=self.deployment.id, - workflow_id=self._workflow_id, - blueprint_id=self.blueprint.id, - status=execution_cls.PENDING, - parameters=self.parameters, - ) - self.model.execution.store(execution) - - @property - def nodes(self): - """ - Iterator over nodes - """ - return self.model.node.iter(filters={'blueprint_id': self.blueprint.id}) - - @property - def node_instances(self): - """ - Iterator over node instances - """ - return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id}) - - -class _CurrentContext(threading.local): - """ - Provides thread-level context, which sugarcoats the task api. - """ - - def __init__(self): - super(_CurrentContext, self).__init__() - self._workflow_context = None - - def _set(self, value): - self._workflow_context = value - - def get(self): - """ - Retrieves the current workflow context - :return: the workflow context - :rtype: WorkflowContext - """ - if self._workflow_context is not None: - return self._workflow_context - raise ContextException("No context was set") - - @contextmanager - def push(self, workflow_context): - """ - Switches the current context to the provided context - :param workflow_context: the context to switch to. - :yields: the current context - """ - prev_workflow_context = self._workflow_context - self._set(workflow_context) - try: - yield self - finally: - self._set(prev_workflow_context) - -current = _CurrentContext() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/decorators.py ---------------------------------------------------------------------- diff --git a/aria/decorators.py b/aria/decorators.py deleted file mode 100644 index 8bde0ef..0000000 --- a/aria/decorators.py +++ /dev/null @@ -1,73 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow and operation decorators -""" - -from uuid import uuid4 -from functools import partial, wraps - -from . import context -from .workflows.api import task_graph -from .tools.validation import validate_function_arguments - - -def workflow(func=None, suffix_template=''): - """ - Workflow decorator - """ - if func is None: - return partial(workflow, suffix_template=suffix_template) - - @wraps(func) - def _wrapper(ctx, **workflow_parameters): - - workflow_name = _generate_name( - func_name=func.__name__, - suffix_template=suffix_template, - ctx=ctx, - **workflow_parameters) - - workflow_parameters.setdefault('ctx', ctx) - workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name)) - validate_function_arguments(func, workflow_parameters) - with context.workflow.current.push(ctx): - func(**workflow_parameters) - return workflow_parameters['graph'] - return _wrapper - - -def operation(func=None, toolbelt=False, suffix_template=''): - """ - Operation decorator - """ - if func is None: - return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt) - - @wraps(func) - def _wrapper(**func_kwargs): - if toolbelt: - operation_toolbelt = context.toolbelt(func_kwargs['ctx']) - func_kwargs.setdefault('toolbelt', operation_toolbelt) - validate_function_arguments(func, func_kwargs) - return func(**func_kwargs) - return _wrapper - - -def _generate_name(func_name, ctx, suffix_template, **custom_kwargs): - return '{func_name}.{suffix}'.format( - func_name=func_name, - suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4())) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/events/__init__.py b/aria/events/__init__.py deleted file mode 100644 index 2e88733..0000000 --- a/aria/events/__init__.py +++ /dev/null @@ -1,57 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -ARIA's events Sub-Package -Path: aria.events - -Events package provides events mechanism for different executions in aria. - - -1. storage_event_handler: implementation of storage handlers for workflow and operation events. -2. logger_event_handler: implementation of logger handlers for workflow and operation events. - -API: - * start_task_signal - * on_success_task_signal - * on_failure_task_signal - * start_workflow_signal - * on_success_workflow_signal - * on_failure_workflow_signal -""" - -import os - -from blinker import signal - -from ..tools.plugin import plugin_installer - -# workflow engine task signals: -sent_task_signal = signal('sent_task_signal') -start_task_signal = signal('start_task_signal') -on_success_task_signal = signal('success_task_signal') -on_failure_task_signal = signal('failure_task_signal') - -# workflow engine workflow signals: -start_workflow_signal = signal('start_workflow_signal') -on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') -on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal') -on_success_workflow_signal = signal('on_success_workflow_signal') -on_failure_workflow_signal = signal('on_failure_workflow_signal') - -plugin_installer( - path=os.path.dirname(os.path.realpath(__file__)), - plugin_suffix='_event_handler', - package=__package__) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py deleted file mode 100644 index c5cccfe..0000000 --- a/aria/events/builtin_event_handler.py +++ /dev/null @@ -1,123 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's events Sub-Package -Path: aria.events.storage_event_handler - -Implementation of storage handlers for workflow and operation events. -""" - - -from datetime import ( - datetime, - timedelta, -) - -from . import ( - start_workflow_signal, - on_success_workflow_signal, - on_failure_workflow_signal, - on_cancelled_workflow_signal, - on_cancelling_workflow_signal, - sent_task_signal, - start_task_signal, - on_success_task_signal, - on_failure_task_signal, -) - - -@sent_task_signal.connect -def _task_sent(task, *args, **kwargs): - with task._update(): - task.status = task.SENT - - -@start_task_signal.connect -def _task_started(task, *args, **kwargs): - with task._update(): - task.started_at = datetime.utcnow() - task.status = task.STARTED - - -@on_failure_task_signal.connect -def _task_failed(task, *args, **kwargs): - with task._update(): - should_retry = ( - (task.retry_count < task.max_attempts - 1 or - task.max_attempts == task.INFINITE_RETRIES) and - # ignore_failure check here means the task will not be retries and it will be marked as - # failed. The engine will also look at ignore_failure so it won't fail the workflow. - not task.ignore_failure) - if should_retry: - task.status = task.RETRYING - task.retry_count += 1 - task.due_at = datetime.utcnow() + timedelta(seconds=task.retry_interval) - else: - task.ended_at = datetime.utcnow() - task.status = task.FAILED - - -@on_success_task_signal.connect -def _task_succeeded(task, *args, **kwargs): - with task._update(): - task.ended_at = datetime.utcnow() - task.status = task.SUCCESS - - -@start_workflow_signal.connect -def _workflow_started(workflow_context, *args, **kwargs): - execution = workflow_context.execution - execution.status = execution.STARTED - execution.started_at = datetime.utcnow() - workflow_context.execution = execution - - -@on_failure_workflow_signal.connect -def _workflow_failed(workflow_context, exception, *args, **kwargs): - execution = workflow_context.execution - execution.error = str(exception) - execution.status = execution.FAILED - execution.ended_at = datetime.utcnow() - workflow_context.execution = execution - - -@on_success_workflow_signal.connect -def _workflow_succeeded(workflow_context, *args, **kwargs): - execution = workflow_context.execution - execution.status = execution.TERMINATED - execution.ended_at = datetime.utcnow() - workflow_context.execution = execution - - -@on_cancelled_workflow_signal.connect -def _workflow_cancelled(workflow_context, *args, **kwargs): - execution = workflow_context.execution - # _workflow_cancelling function may have called this function - # already - if execution.status == execution.CANCELLED: - return - execution.status = execution.CANCELLED - execution.ended_at = datetime.utcnow() - workflow_context.execution = execution - - -@on_cancelling_workflow_signal.connect -def _workflow_cancelling(workflow_context, *args, **kwargs): - execution = workflow_context.execution - if execution.status == execution.PENDING: - return _workflow_cancelled(workflow_context=workflow_context) - execution.status = execution.CANCELLING - workflow_context.execution = execution http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/events/workflow_engine_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py deleted file mode 100644 index 7df11d1..0000000 --- a/aria/events/workflow_engine_event_handler.py +++ /dev/null @@ -1,74 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -""" -Aria's events Sub-Package -Path: aria.events.storage_event_handler - -Implementation of logger handlers for workflow and operation events. -""" - -from . import ( - start_task_signal, - on_success_task_signal, - on_failure_task_signal, - start_workflow_signal, - on_success_workflow_signal, - on_failure_workflow_signal, - on_cancelled_workflow_signal, - on_cancelling_workflow_signal, -) - - -@start_task_signal.connect -def _start_task_handler(task, **kwargs): - task.logger.debug('Event: Starting task: {task.name}'.format(task=task)) - - -@on_success_task_signal.connect -def _success_task_handler(task, **kwargs): - task.logger.debug('Event: Task success: {task.name}'.format(task=task)) - - -@on_failure_task_signal.connect -def _failure_operation_handler(task, **kwargs): - task.logger.error('Event: Task failure: {task.name}'.format(task=task), - exc_info=kwargs.get('exception', True)) - - -@start_workflow_signal.connect -def _start_workflow_handler(context, **kwargs): - context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context)) - - -@on_failure_workflow_signal.connect -def _failure_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context)) - - -@on_success_workflow_signal.connect -def _success_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) - - -@on_cancelled_workflow_signal.connect -def _cancel_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) - - -@on_cancelling_workflow_signal.connect -def _cancelling_workflow_handler(context, **kwargs): - context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/exceptions.py b/aria/exceptions.py index d68b5a2..472b264 100644 --- a/aria/exceptions.py +++ b/aria/exceptions.py @@ -19,18 +19,9 @@ Every sub-package in Aria has a module with its exceptions. aria.exceptions module conveniently collects all these exceptions for easier imports. """ -from .workflows.exceptions import * # pylint: disable=wildcard-import,unused-wildcard-import - class AriaError(Exception): """ General aria exception """ pass - - -class StorageError(AriaError): - """ - General storage exception - """ - pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/__init__.py b/aria/orchestrator/__init__.py new file mode 100644 index 0000000..a5aeec7 --- /dev/null +++ b/aria/orchestrator/__init__.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .decorators import workflow, operation + +from . import ( + context, + events, + workflows, + decorators +) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/__init__.py b/aria/orchestrator/context/__init__.py new file mode 100644 index 0000000..ad89b13 --- /dev/null +++ b/aria/orchestrator/context/__init__.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Provides contexts to workflow and operation +""" + +from . import workflow, operation +from .toolbelt import toolbelt http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py new file mode 100644 index 0000000..48f4557 --- /dev/null +++ b/aria/orchestrator/context/common.py @@ -0,0 +1,145 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +A common context for both workflow and operation +""" +from uuid import uuid4 + +from aria import logger +from aria.storage import exceptions +from aria.tools.lru_cache import lru_cache + + +class BaseContext(logger.LoggerMixin): + """ + Base context object for workflow and operation + """ + + def __init__( + self, + name, + model_storage, + resource_storage, + deployment_id, + workflow_id, + execution_id=None, + task_max_attempts=1, + task_retry_interval=0, + task_ignore_failure=False, + **kwargs): + super(BaseContext, self).__init__(**kwargs) + self._name = name + self._id = str(uuid4()) + self._model = model_storage + self._resource = resource_storage + self._deployment_id = deployment_id + self._workflow_id = workflow_id + self._execution_id = execution_id or str(uuid4()) + self._task_max_attempts = task_max_attempts + self._task_retry_interval = task_retry_interval + self._task_ignore_failure = task_ignore_failure + + def __repr__(self): + return ( + '{name}(name={self.name}, ' + 'deployment_id={self._deployment_id}, ' + 'workflow_id={self._workflow_id}, ' + 'execution_id={self._execution_id})' + .format(name=self.__class__.__name__, self=self)) + + @property + def model(self): + """ + Access to the model storage + :return: + """ + return self._model + + @property + def resource(self): + """ + Access to the resource storage + :return: + """ + return self._resource + + @property + @lru_cache() + def blueprint(self): + """ + The blueprint model + """ + return self.model.blueprint.get(self.deployment.blueprint_id) + + @property + @lru_cache() + def deployment(self): + """ + The deployment model + """ + return self.model.deployment.get(self._deployment_id) + + @property + def execution(self): + """ + The execution model + """ + return self.model.execution.get(self._execution_id) + + @execution.setter + def execution(self, value): + """ + Store the execution in the model storage + """ + self.model.execution.store(value) + + @property + def name(self): + """ + The operation name + :return: + """ + return self._name + + @property + def id(self): + """ + The operation id + :return: + """ + return self._id + + def download_resource(self, destination, path=None): + """ + Download a blueprint resource from the resource storage + """ + try: + return self.resource.deployment.download(entry_id=self.deployment.id, + destination=destination, + path=path) + except exceptions.StorageError: + return self.resource.blueprint.download(entry_id=self.blueprint.id, + destination=destination, + path=path) + + @lru_cache() + def get_resource(self, path=None): + """ + Read a deployment resource as string from the resource storage + """ + try: + return self.resource.deployment.data(entry_id=self.deployment.id, path=path) + except exceptions.StorageError: + return self.resource.blueprint.data(entry_id=self.blueprint.id, path=path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/context/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/exceptions.py b/aria/orchestrator/context/exceptions.py new file mode 100644 index 0000000..6704bbc --- /dev/null +++ b/aria/orchestrator/context/exceptions.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ..exceptions import OrchestratorError + + +class ContextException(OrchestratorError): + """ + Context based exception + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py new file mode 100644 index 0000000..bf3686d --- /dev/null +++ b/aria/orchestrator/context/operation.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow and operation contexts +""" + + +from .common import BaseContext + + +class BaseOperationContext(BaseContext): + """ + Context object used during operation creation and execution + """ + + def __init__(self, name, workflow_context, task, **kwargs): + super(BaseOperationContext, self).__init__( + name=name, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + deployment_id=workflow_context._deployment_id, + workflow_id=workflow_context._workflow_id, + execution_id=workflow_context._execution_id, + **kwargs) + self._task_model = task + self._actor = self.task.actor + + def __repr__(self): + details = 'operation_mapping={task.operation_mapping}; ' \ + 'operation_inputs={task.inputs}'\ + .format(task=self.task) + return '{name}({0})'.format(details, name=self.name) + + @property + def task(self): + """ + The task in the model storage + :return: Task model + """ + return self._task_model + + +class NodeOperationContext(BaseOperationContext): + """ + Context for node based operations. + """ + @property + def node(self): + """ + the node of the current operation + :return: + """ + return self._actor.node + + @property + def node_instance(self): + """ + The node instance of the current operation + :return: + """ + return self._actor + + +class RelationshipOperationContext(BaseOperationContext): + """ + Context for relationship based operations. + """ + @property + def source_node(self): + """ + The source node + :return: + """ + return self.model.node.get(self.relationship.source_id) + + @property + def source_node_instance(self): + """ + The source node instance + :return: + """ + return self.model.node_instance.get(self.relationship_instance.source_id) + + @property + def target_node(self): + """ + The target node + :return: + """ + return self.model.node.get(self.relationship.target_id) + + @property + def target_node_instance(self): + """ + The target node instance + :return: + """ + return self.model.node_instance.get(self._actor.target_id) + + @property + def relationship(self): + """ + The relationship of the current operation + :return: + """ + return self._actor.relationship + + @property + def relationship_instance(self): + """ + The relationship instance of the current operation + :return: + """ + return self._actor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py new file mode 100644 index 0000000..0aad89c --- /dev/null +++ b/aria/orchestrator/context/toolbelt.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Provides with different tools for operations. +""" + +from . import operation + + +class NodeToolBelt(object): + """ + Node operation related tool belt + """ + def __init__(self, operation_context): + self._op_context = operation_context + + @property + def dependent_node_instances(self): + """ + Any node instance which has a relationship to the current node instance. + :return: + """ + assert isinstance(self._op_context, operation.NodeOperationContext) + node_instances = self._op_context.model.node_instance.iter( + filters={'deployment_id': self._op_context.deployment.id} + ) + for node_instance in node_instances: + for relationship_instance in node_instance.relationship_instances: + if relationship_instance.target_id == self._op_context.node_instance.id: + yield node_instance + + @property + def host_ip(self): + """ + The host ip of the current node + :return: + """ + assert isinstance(self._op_context, operation.NodeOperationContext) + host_id = self._op_context._actor.host_id + host_instance = self._op_context.model.node_instance.get(host_id) + return host_instance.runtime_properties.get('ip') + + +class RelationshipToolBelt(object): + """ + Relationship operation related tool belt + """ + def __init__(self, operation_context): + self._op_context = operation_context + + +def toolbelt(operation_context): + """ + Get a toolbelt according to the current operation executor + :param operation_context: + :return: + """ + if isinstance(operation_context, operation.NodeOperationContext): + return NodeToolBelt(operation_context) + elif isinstance(operation_context, operation.RelationshipOperationContext): + return RelationshipToolBelt(operation_context) + else: + raise RuntimeError("Operation context not supported") http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py new file mode 100644 index 0000000..3dc222b --- /dev/null +++ b/aria/orchestrator/context/workflow.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow and operation contexts +""" + +import threading +from contextlib import contextmanager + +from aria import storage + +from .exceptions import ContextException +from .common import BaseContext + + +class WorkflowContext(BaseContext): + """ + Context object used during workflow creation and execution + """ + def __init__(self, parameters=None, *args, **kwargs): + super(WorkflowContext, self).__init__(*args, **kwargs) + self.parameters = parameters or {} + # TODO: execution creation should happen somewhere else + # should be moved there, when such logical place exists + try: + self.model.execution.get(self._execution_id) + except storage.exceptions.StorageError: + self._create_execution() + + def __repr__(self): + return ( + '{name}(deployment_id={self._deployment_id}, ' + 'workflow_id={self._workflow_id}, ' + 'execution_id={self._execution_id})'.format( + name=self.__class__.__name__, self=self)) + + def _create_execution(self): + execution_cls = self.model.execution.model_cls + execution = self.model.execution.model_cls( + id=self._execution_id, + deployment_id=self.deployment.id, + workflow_id=self._workflow_id, + blueprint_id=self.blueprint.id, + status=execution_cls.PENDING, + parameters=self.parameters, + ) + self.model.execution.store(execution) + + @property + def nodes(self): + """ + Iterator over nodes + """ + return self.model.node.iter(filters={'blueprint_id': self.blueprint.id}) + + @property + def node_instances(self): + """ + Iterator over node instances + """ + return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id}) + + +class _CurrentContext(threading.local): + """ + Provides thread-level context, which sugarcoats the task api. + """ + + def __init__(self): + super(_CurrentContext, self).__init__() + self._workflow_context = None + + def _set(self, value): + self._workflow_context = value + + def get(self): + """ + Retrieves the current workflow context + :return: the workflow context + :rtype: WorkflowContext + """ + if self._workflow_context is not None: + return self._workflow_context + raise ContextException("No context was set") + + @contextmanager + def push(self, workflow_context): + """ + Switches the current context to the provided context + :param workflow_context: the context to switch to. + :yields: the current context + """ + prev_workflow_context = self._workflow_context + self._set(workflow_context) + try: + yield self + finally: + self._set(prev_workflow_context) + +current = _CurrentContext() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/decorators.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py new file mode 100644 index 0000000..7f7685d --- /dev/null +++ b/aria/orchestrator/decorators.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow and operation decorators +""" + +from uuid import uuid4 +from functools import partial, wraps + +from aria.tools.validation import validate_function_arguments + +from . import context +from .workflows.api import task_graph + + +def workflow(func=None, suffix_template=''): + """ + Workflow decorator + """ + if func is None: + return partial(workflow, suffix_template=suffix_template) + + @wraps(func) + def _wrapper(ctx, **workflow_parameters): + + workflow_name = _generate_name( + func_name=func.__name__, + suffix_template=suffix_template, + ctx=ctx, + **workflow_parameters) + + workflow_parameters.setdefault('ctx', ctx) + workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name)) + validate_function_arguments(func, workflow_parameters) + with context.workflow.current.push(ctx): + func(**workflow_parameters) + return workflow_parameters['graph'] + return _wrapper + + +def operation(func=None, toolbelt=False, suffix_template=''): + """ + Operation decorator + """ + if func is None: + return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt) + + @wraps(func) + def _wrapper(**func_kwargs): + if toolbelt: + operation_toolbelt = context.toolbelt(func_kwargs['ctx']) + func_kwargs.setdefault('toolbelt', operation_toolbelt) + validate_function_arguments(func, func_kwargs) + return func(**func_kwargs) + return _wrapper + + +def _generate_name(func_name, ctx, suffix_template, **custom_kwargs): + return '{func_name}.{suffix}'.format( + func_name=func_name, + suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4())) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/events/__init__.py b/aria/orchestrator/events/__init__.py new file mode 100644 index 0000000..40bdd24 --- /dev/null +++ b/aria/orchestrator/events/__init__.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ARIA's events Sub-Package +Path: aria.events + +Events package provides events mechanism for different executions in aria. + + +1. storage_event_handler: implementation of storage handlers for workflow and operation events. +2. logger_event_handler: implementation of logger handlers for workflow and operation events. + +API: + * start_task_signal + * on_success_task_signal + * on_failure_task_signal + * start_workflow_signal + * on_success_workflow_signal + * on_failure_workflow_signal +""" + +import os + +from blinker import signal + +from aria.tools import plugin_installer + +# workflow engine task signals: +sent_task_signal = signal('sent_task_signal') +start_task_signal = signal('start_task_signal') +on_success_task_signal = signal('success_task_signal') +on_failure_task_signal = signal('failure_task_signal') + +# workflow engine workflow signals: +start_workflow_signal = signal('start_workflow_signal') +on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') +on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal') +on_success_workflow_signal = signal('on_success_workflow_signal') +on_failure_workflow_signal = signal('on_failure_workflow_signal') + +plugin_installer( + path=os.path.dirname(os.path.realpath(__file__)), + plugin_suffix='_event_handler', + package=__package__) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/events/builtin_event_handler.py b/aria/orchestrator/events/builtin_event_handler.py new file mode 100644 index 0000000..c5cccfe --- /dev/null +++ b/aria/orchestrator/events/builtin_event_handler.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Aria's events Sub-Package +Path: aria.events.storage_event_handler + +Implementation of storage handlers for workflow and operation events. +""" + + +from datetime import ( + datetime, + timedelta, +) + +from . import ( + start_workflow_signal, + on_success_workflow_signal, + on_failure_workflow_signal, + on_cancelled_workflow_signal, + on_cancelling_workflow_signal, + sent_task_signal, + start_task_signal, + on_success_task_signal, + on_failure_task_signal, +) + + +@sent_task_signal.connect +def _task_sent(task, *args, **kwargs): + with task._update(): + task.status = task.SENT + + +@start_task_signal.connect +def _task_started(task, *args, **kwargs): + with task._update(): + task.started_at = datetime.utcnow() + task.status = task.STARTED + + +@on_failure_task_signal.connect +def _task_failed(task, *args, **kwargs): + with task._update(): + should_retry = ( + (task.retry_count < task.max_attempts - 1 or + task.max_attempts == task.INFINITE_RETRIES) and + # ignore_failure check here means the task will not be retries and it will be marked as + # failed. The engine will also look at ignore_failure so it won't fail the workflow. + not task.ignore_failure) + if should_retry: + task.status = task.RETRYING + task.retry_count += 1 + task.due_at = datetime.utcnow() + timedelta(seconds=task.retry_interval) + else: + task.ended_at = datetime.utcnow() + task.status = task.FAILED + + +@on_success_task_signal.connect +def _task_succeeded(task, *args, **kwargs): + with task._update(): + task.ended_at = datetime.utcnow() + task.status = task.SUCCESS + + +@start_workflow_signal.connect +def _workflow_started(workflow_context, *args, **kwargs): + execution = workflow_context.execution + execution.status = execution.STARTED + execution.started_at = datetime.utcnow() + workflow_context.execution = execution + + +@on_failure_workflow_signal.connect +def _workflow_failed(workflow_context, exception, *args, **kwargs): + execution = workflow_context.execution + execution.error = str(exception) + execution.status = execution.FAILED + execution.ended_at = datetime.utcnow() + workflow_context.execution = execution + + +@on_success_workflow_signal.connect +def _workflow_succeeded(workflow_context, *args, **kwargs): + execution = workflow_context.execution + execution.status = execution.TERMINATED + execution.ended_at = datetime.utcnow() + workflow_context.execution = execution + + +@on_cancelled_workflow_signal.connect +def _workflow_cancelled(workflow_context, *args, **kwargs): + execution = workflow_context.execution + # _workflow_cancelling function may have called this function + # already + if execution.status == execution.CANCELLED: + return + execution.status = execution.CANCELLED + execution.ended_at = datetime.utcnow() + workflow_context.execution = execution + + +@on_cancelling_workflow_signal.connect +def _workflow_cancelling(workflow_context, *args, **kwargs): + execution = workflow_context.execution + if execution.status == execution.PENDING: + return _workflow_cancelled(workflow_context=workflow_context) + execution.status = execution.CANCELLING + workflow_context.execution = execution http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/events/workflow_engine_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/events/workflow_engine_event_handler.py b/aria/orchestrator/events/workflow_engine_event_handler.py new file mode 100644 index 0000000..7df11d1 --- /dev/null +++ b/aria/orchestrator/events/workflow_engine_event_handler.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +Aria's events Sub-Package +Path: aria.events.storage_event_handler + +Implementation of logger handlers for workflow and operation events. +""" + +from . import ( + start_task_signal, + on_success_task_signal, + on_failure_task_signal, + start_workflow_signal, + on_success_workflow_signal, + on_failure_workflow_signal, + on_cancelled_workflow_signal, + on_cancelling_workflow_signal, +) + + +@start_task_signal.connect +def _start_task_handler(task, **kwargs): + task.logger.debug('Event: Starting task: {task.name}'.format(task=task)) + + +@on_success_task_signal.connect +def _success_task_handler(task, **kwargs): + task.logger.debug('Event: Task success: {task.name}'.format(task=task)) + + +@on_failure_task_signal.connect +def _failure_operation_handler(task, **kwargs): + task.logger.error('Event: Task failure: {task.name}'.format(task=task), + exc_info=kwargs.get('exception', True)) + + +@start_workflow_signal.connect +def _start_workflow_handler(context, **kwargs): + context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context)) + + +@on_failure_workflow_signal.connect +def _failure_workflow_handler(context, **kwargs): + context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context)) + + +@on_success_workflow_signal.connect +def _success_workflow_handler(context, **kwargs): + context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) + + +@on_cancelled_workflow_signal.connect +def _cancel_workflow_handler(context, **kwargs): + context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) + + +@on_cancelling_workflow_signal.connect +def _cancelling_workflow_handler(context, **kwargs): + context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py new file mode 100644 index 0000000..75b37cf --- /dev/null +++ b/aria/orchestrator/exceptions.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aria.exceptions import AriaError + + +class OrchestratorError(AriaError): + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/workflows/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/__init__.py b/aria/orchestrator/workflows/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/aria/orchestrator/workflows/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/workflows/api/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/__init__.py b/aria/orchestrator/workflows/api/__init__.py new file mode 100644 index 0000000..a3a17ee --- /dev/null +++ b/aria/orchestrator/workflows/api/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Provides API for building tasks +""" + +from . import task, task_graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/17510b02/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py new file mode 100644 index 0000000..4d36725 --- /dev/null +++ b/aria/orchestrator/workflows/api/task.py @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Provides the tasks to be entered into the task graph +""" +from uuid import uuid4 + +import aria + +from ... import context +from .. import exceptions + + +class BaseTask(object): + """ + Abstract task_graph task + """ + def __init__(self, ctx=None, **kwargs): + if ctx is not None: + self._workflow_context = ctx + else: + self._workflow_context = context.workflow.current.get() + self._id = str(uuid4()) + + @property + def id(self): + """ + uuid4 generated id + :return: + """ + return self._id + + @property + def workflow_context(self): + """ + the context of the current workflow + :return: + """ + return self._workflow_context + + +class OperationTask(BaseTask): + """ + Represents an operation task in the task_graph + """ + + SOURCE_OPERATION = 'source_operations' + TARGET_OPERATION = 'target_operations' + + def __init__(self, + name, + actor, + operation_mapping, + max_attempts=None, + retry_interval=None, + ignore_failure=None, + inputs=None): + """ + Creates an operation task using the name, details, node instance and any additional kwargs. + :param name: the operation of the name. + :param operation_details: the details for the operation. + :param actor: the operation host on which this operation is registered. + :param inputs: operation inputs. + """ + assert isinstance(actor, (aria.storage.models.NodeInstance, + aria.storage.models.RelationshipInstance)) + super(OperationTask, self).__init__() + self.actor = actor + self.name = '{name}.{actor.id}'.format(name=name, actor=actor) + self.operation_mapping = operation_mapping + self.inputs = inputs or {} + self.max_attempts = (self.workflow_context._task_max_attempts + if max_attempts is None else max_attempts) + self.retry_interval = (self.workflow_context._task_retry_interval + if retry_interval is None else retry_interval) + self.ignore_failure = (self.workflow_context._task_ignore_failure + if ignore_failure is None else ignore_failure) + + @classmethod + def node_instance(cls, instance, name, inputs=None, *args, **kwargs): + """ + Represents a node based operation + + :param instance: the node of which this operation belongs to. + :param name: the name of the operation. + """ + assert isinstance(instance, aria.storage.models.NodeInstance) + operation_details = instance.node.operations[name] + operation_inputs = operation_details.get('inputs', {}) + operation_inputs.update(inputs or {}) + return cls(name=name, + actor=instance, + operation_mapping=operation_details.get('operation', ''), + inputs=operation_inputs, + *args, + **kwargs) + + @classmethod + def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs): + """ + Represents a relationship based operation + + :param instance: the relationship of which this operation belongs to. + :param name: the name of the operation. + :param operation_end: source or target end of the relationship, this corresponds directly + with 'source_operations' and 'target_operations' + :param inputs any additional inputs to the operation + """ + assert isinstance(instance, aria.storage.models.RelationshipInstance) + if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: + raise exceptions.TaskException('The operation end should be {0} or {1}'.format( + cls.TARGET_OPERATION, cls.SOURCE_OPERATION + )) + operation_details = getattr(instance.relationship, operation_end)[name] + operation_inputs = operation_details.get('inputs', {}) + operation_inputs.update(inputs or {}) + return cls(actor=instance, + name=name, + operation_mapping=operation_details.get('operation'), + inputs=operation_inputs, + *args, + **kwargs) + + +class WorkflowTask(BaseTask): + """ + Represents an workflow task in the task_graph + """ + def __init__(self, workflow_func, **kwargs): + """ + Creates a workflow based task using the workflow_func provided, and its kwargs + :param workflow_func: the function to run + :param kwargs: the kwargs that would be passed to the workflow_func + """ + super(WorkflowTask, self).__init__(**kwargs) + kwargs['ctx'] = self.workflow_context + self._graph = workflow_func(**kwargs) + + @property + def graph(self): + """ + The graph constructed by the sub workflow + :return: + """ + return self._graph + + def __getattr__(self, item): + try: + return getattr(self._graph, item) + except AttributeError: + return super(WorkflowTask, self).__getattribute__(item) + + +class StubTask(BaseTask): + """ + Enables creating empty tasks. + """ + pass