http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 127641f..f631e79 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -68,17 +68,6 @@ class BaseContext(object): self._workdir = workdir self.logger = None - def _create_execution(self): - now = datetime.utcnow() - execution = self.model.execution.model_cls( - service_instance=self.service_instance, - workflow_name=self._workflow_name, - created_at=now, - parameters=self.parameters, - ) - self.model.execution.put(execution) - return execution.id - def _register_logger(self, logger_name=None, level=None, task_id=None): self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__), self.logging_id, @@ -168,13 +157,13 @@ class BaseContext(object): Download a blueprint resource from the resource storage """ try: - self.resource.deployment.download(entry_id=str(self.service.id), - destination=destination, - path=path) + self.resource.service.download(entry_id=str(self.service.id), + destination=destination, + path=path) except exceptions.StorageError: - self.resource.blueprint.download(entry_id=str(self.service_template.id), - destination=destination, - path=path) + self.resource.service_template.download(entry_id=str(self.service_template.id), + destination=destination, + path=path) def download_resource_and_render(self, destination, path=None, variables=None): """ @@ -193,9 +182,9 @@ class BaseContext(object): Read a deployment resource as string from the resource storage """ try: - return self.resource.deployment.read(entry_id=str(self.service.id), path=path) + return self.resource.service.read(entry_id=str(self.service.id), path=path) except exceptions.StorageError: - return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path) + return self.resource.service_template.read(entry_id=str(self.service_template.id), path=path) def get_resource_and_render(self, path=None, variables=None): """
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 5f86d9d..bc9f653 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -31,11 +31,11 @@ class WorkflowContext(BaseContext): """ def __init__(self, workflow_name, + execution_id, parameters=None, task_max_attempts=1, task_retry_interval=0, task_ignore_failure=False, - execution_id=None, *args, **kwargs): super(WorkflowContext, self).__init__(*args, **kwargs) self._workflow_name = workflow_name @@ -43,28 +43,15 @@ class WorkflowContext(BaseContext): self._task_max_attempts = task_max_attempts self._task_retry_interval = task_retry_interval self._task_ignore_failure = task_ignore_failure - # TODO: execution creation should happen somewhere else - # should be moved there, when such logical place exists - self._execution_id = execution_id or self._create_execution() + self._execution_id = execution_id self._register_logger() def __repr__(self): return ( '{name}(deployment_id={self._service_id}, ' - 'workflow_name={self._workflow_name}'.format( + 'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format( name=self.__class__.__name__, self=self)) - def _create_execution(self): - now = datetime.utcnow() - execution = self.model.execution.model_cls( - service=self.service, - workflow_name=self._workflow_name, - created_at=now, - parameters=self.parameters, - ) - self.model.execution.put(execution) - return execution.id - @property def logging_id(self): return '{0}[{1}]'.format(self._workflow_name, self._execution_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 817d064..52a5312 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -24,6 +24,7 @@ import StringIO import wsgiref.simple_server import bottle +from aria import modeling from .. import exceptions @@ -111,7 +112,7 @@ class CtxProxy(object): result = json.dumps({ 'type': result_type, 'payload': payload - }) + }, cls=modeling.utils.ModelJSONEncoder) except Exception as e: traceback_out = StringIO.StringIO() traceback.print_exc(file=traceback_out) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py deleted file mode 100644 index f1633fa..0000000 --- a/aria/orchestrator/runner.py +++ /dev/null @@ -1,101 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow runner -""" - -import tempfile -import os - -from .context.workflow import WorkflowContext -from .workflows.core.engine import Engine -from .workflows.executor.thread import ThreadExecutor -from ..storage import ( - sql_mapi, - filesystem_rapi, -) -from .. import ( - application_model_storage, - application_resource_storage -) - - -class Runner(object): - """ - Runs workflows on a deployment. By default uses temporary storage (either on disk or in memory) - but can also be used with existing storage. - - Handles the initialization of the storage engine and provides convenience methods for - sub-classes to create tasks. - - :param path: path to Sqlite database file; use '' (the default) to use a temporary file, - and None to use an in-memory database - :type path: string - """ - - def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn, - service_id_fn, storage_path='', is_storage_temporary=True): - if storage_path == '': - # Temporary file storage - the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-') - os.close(the_file) - - self._storage_path = storage_path - self._storage_dir = os.path.dirname(storage_path) - self._storage_name = os.path.basename(storage_path) - self._is_storage_temporary = is_storage_temporary - - workflow_context = self.create_workflow_context(workflow_name, initialize_model_storage_fn, - service_id_fn) - - tasks_graph = workflow_fn(ctx=workflow_context, **inputs) - - self._engine = Engine( - executor=ThreadExecutor(), - workflow_context=workflow_context, - tasks_graph=tasks_graph) - - def run(self): - try: - self._engine.execute() - finally: - self.cleanup() - - def create_workflow_context(self, - workflow_name, - initialize_model_storage_fn, - service_id_fn): - self.cleanup() - model_storage = application_model_storage( - sql_mapi.SQLAlchemyModelAPI, - initiator_kwargs=dict(base_dir=self._storage_dir, filename=self._storage_name)) - if initialize_model_storage_fn: - initialize_model_storage_fn(model_storage) - resource_storage = application_resource_storage( - filesystem_rapi.FileSystemResourceAPI, api_kwargs=dict(directory='.')) - return WorkflowContext( - name=workflow_name, - model_storage=model_storage, - resource_storage=resource_storage, - service_id=service_id_fn(), - workflow_name=self.__class__.__name__, - task_max_attempts=1, - task_retry_interval=1) - - def cleanup(self): - if (self._is_storage_temporary and (self._storage_path is not None) and - os.path.isfile(self._storage_path)): - os.remove(self._storage_path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py new file mode 100644 index 0000000..8b6b431 --- /dev/null +++ b/aria/orchestrator/workflow_runner.py @@ -0,0 +1,147 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow runner +""" + +import sys +from datetime import datetime + +from .context.workflow import WorkflowContext +from .workflows.builtin import BUILTIN_WORKFLOWS, BUILTIN_WORKFLOWS_PATH_PREFIX +from .workflows.core.engine import Engine +from .workflows.executor.process import ProcessExecutor +from ..exceptions import AriaException +from ..modeling import utils as modeling_utils +from ..modeling import models +from ..utils.imports import import_fullname + + +DEFAULT_TASK_MAX_ATTEMPTS = 1 +DEFAULT_TASK_RETRY_INTERVAL = 1 +# TODO move this constant somewhere in the DSL parser? +WORKFLOW_POLICY_INTERNAL_PROPERTIES = ('implementation', 'dependencies') + + +class WorkflowRunner(object): + + def __init__(self, workflow_name, service_name, inputs, + model_storage, resource_storage, plugin_manager, + task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, + task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): + + self._model_storage = model_storage + self._workflow_name = workflow_name + service = model_storage.service.get_by_name(service_name) + # the IDs are stored rather than the models themselves, so this module could be used + # by several threads without raising errors on model objects shared between threads + self._service_id = service.id + + self._validate_workflow_exists_for_service() + + workflow_fn = self._get_workflow_fn() + + execution = self._create_execution_models(inputs) + self._execution_id = execution.id + + workflow_context = WorkflowContext( + name=self.__class__.__name__, + model_storage=self._model_storage, + resource_storage=resource_storage, + service_id=service.id, + execution_id=execution.id, + workflow_name=workflow_name, + task_max_attempts=task_max_attempts, + task_retry_interval=task_retry_interval) + + # transforming the execution inputs to dict, to pass them to the workflow function + execution_inputs_dict = {input.name: input.value for input in + self.execution.inputs.values()} + self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict) + + self._engine = Engine( + executor=ProcessExecutor(plugin_manager=plugin_manager), + workflow_context=workflow_context, + tasks_graph=self._tasks_graph) + + @property + def execution(self): + return self._model_storage.execution.get(self._execution_id) + + @property + def service(self): + return self._model_storage.service.get(self._service_id) + + def execute(self): + #TODO uncomment, commented for testing purposes + # self._validate_no_active_executions() + self._engine.execute() + + def cancel(self): + self._engine.cancel_execution() + + def _create_execution_models(self, inputs): + execution = models.Execution( + created_at=datetime.utcnow(), + service=self.service, + workflow_name=self._workflow_name, + inputs={}) + + # built-in workflows don't have any inputs, and are also + # not a part of the service's workflows field + if self._workflow_name not in BUILTIN_WORKFLOWS: + workflow_inputs = {k: v for k, v in + self.service.workflows[self._workflow_name].inputs + if k not in WORKFLOW_POLICY_INTERNAL_PROPERTIES} + + execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs) + + self._model_storage.execution.put(execution) + return execution + + def _validate_workflow_exists_for_service(self): + if self._workflow_name not in self.service.workflows and \ + self._workflow_name not in BUILTIN_WORKFLOWS: + raise AriaException('No workflow policy {0} declared in service instance {1}' + .format(self._workflow_name, self.service.name)) + + def _validate_no_active_executions(self): + active_executions_filter = dict(service=self.service, + status=models.Execution.ACTIVE_STATES) + active_executions = self._model_storage.execution.list(filter=active_executions_filter) + if active_executions: + raise AriaException("Can't start execution; Service {0} has a running " + "execution with id {1}" + .format(self.service.name, active_executions[0].id)) + + def _get_workflow_fn(self): + if self._workflow_name in BUILTIN_WORKFLOWS: + return import_fullname('{0}.{1}'.format(BUILTIN_WORKFLOWS_PATH_PREFIX, + self._workflow_name)) + + workflow = self.service.workflows[self._workflow_name] + + try: + # TODO: perhaps pass to import_fullname as paths instead of appending to sys path? + # TODO: revisit; workflow.implementation to be used instead? + sys.path.append(workflow.properties['implementation'].value) + # sys.path.append(os.path.dirname(str(context.presentation.location))) + except KeyError: + # no implementation field - a script has been provided directly + pass + + workflow_fn = import_fullname(workflow.properties['implementation'].value) + return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index f49ec2e..2ec85b9 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -19,7 +19,7 @@ Provides the tasks to be entered into the task graph import copy from ....modeling import models -from ....utils.collections import OrderedDict +from ....modeling import utils as modeling_utils from ....utils.uuid import generate_uuid from ... import context from .. import exceptions @@ -63,7 +63,6 @@ class OperationTask(BaseTask): def __init__(self, actor, - actor_type, interface_name, operation_name, runs_on=None, @@ -76,6 +75,7 @@ class OperationTask(BaseTask): :meth:`for_relationship`. """ + actor_type = type(actor).__name__.lower() assert isinstance(actor, (models.Node, models.Relationship)) assert actor_type in ('node', 'relationship') assert interface_name and operation_name @@ -93,22 +93,7 @@ class OperationTask(BaseTask): self.interface_name = interface_name self.operation_name = operation_name - # Wrap inputs - inputs = copy.deepcopy(inputs) if inputs else {} - for k, v in inputs.iteritems(): - if not isinstance(v, models.Parameter): - inputs[k] = models.Parameter.wrap(k, v) - - # TODO: Suggestion: these extra inputs could be stored as a separate entry in the task - # model, because they are different from the operation inputs. If we do this, then the two - # kinds of inputs should *not* be merged here. - - operation = self._get_operation() - if operation is None: - raise exceptions.OperationNotFoundException( - 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' - .format(self.operation_name, self.interface_name, actor_type, actor.name)) - + operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] self.plugin = None if operation.plugin_specification: self.plugin = OperationTask._find_plugin(operation.plugin_specification) @@ -117,9 +102,8 @@ class OperationTask(BaseTask): 'Could not find plugin of operation "{0}" on interface "{1}" for {2} "{3}"' .format(self.operation_name, self.interface_name, actor_type, actor.name)) + self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs) self.implementation = operation.implementation - self.inputs = OperationTask._merge_inputs(operation.inputs, inputs) - self.name = OperationTask.NAME_FORMAT.format(type=actor_type, name=actor.name, interface=self.interface_name, @@ -128,14 +112,6 @@ class OperationTask(BaseTask): def __repr__(self): return self.name - def _get_operation(self): - interface = self.actor.interfaces.get(self.interface_name) - if interface: - return interface.operations.get(self.operation_name) - return None - - - @classmethod def for_node(cls, node, @@ -163,7 +139,6 @@ class OperationTask(BaseTask): assert isinstance(node, models.Node) return cls( actor=node, - actor_type='node', interface_name=interface_name, operation_name=operation_name, max_attempts=max_attempts, @@ -202,7 +177,6 @@ class OperationTask(BaseTask): assert runs_on in models.Task.RUNS_ON return cls( actor=relationship, - actor_type='relationship', interface_name=interface_name, operation_name=operation_name, runs_on=runs_on, @@ -216,13 +190,6 @@ class OperationTask(BaseTask): workflow_context = context.workflow.current.get() return plugin_specification.find_plugin(workflow_context.model.plugin.list()) - @staticmethod - def _merge_inputs(operation_inputs, override_inputs=None): - final_inputs = OrderedDict(operation_inputs) - if override_inputs: - final_inputs.update(override_inputs) - return final_inputs - class WorkflowTask(BaseTask): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflows/builtin/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/__init__.py b/aria/orchestrator/workflows/builtin/__init__.py index d43a962..8b13c62 100644 --- a/aria/orchestrator/workflows/builtin/__init__.py +++ b/aria/orchestrator/workflows/builtin/__init__.py @@ -24,6 +24,7 @@ from .stop import stop BUILTIN_WORKFLOWS = ('install', 'uninstall', 'start', 'stop') +BUILTIN_WORKFLOWS_PATH_PREFIX = 'aria.orchestrator.workflows.builtin' __all__ = [ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py index 348f47a..7ee135f 100644 --- a/aria/orchestrator/workflows/builtin/execute_operation.py +++ b/aria/orchestrator/workflows/builtin/execute_operation.py @@ -17,6 +17,7 @@ Builtin execute_operation workflow """ +from . import utils from ..api.task import OperationTask from ... import workflow @@ -28,7 +29,6 @@ def execute_operation( interface_name, operation_name, operation_kwargs, - allow_kwargs_override, run_by_dependency_order, type_names, node_template_ids, @@ -41,7 +41,6 @@ def execute_operation( :param TaskGraph graph: the graph which will describe the workflow. :param basestring operation: the operation name to execute :param dict operation_kwargs: - :param bool allow_kwargs_override: :param bool run_by_dependency_order: :param type_names: :param node_template_ids: @@ -71,8 +70,7 @@ def execute_operation( node=node, interface_name=interface_name, operation_name=operation_name, - operation_kwargs=operation_kwargs, - allow_kwargs_override=allow_kwargs_override + operation_kwargs=operation_kwargs ) ) @@ -108,21 +106,16 @@ def _create_node_task( node, interface_name, operation_name, - operation_kwargs, - allow_kwargs_override): + operation_kwargs): """ A workflow which executes a single operation :param node: the node instance to install :param basestring operation: the operation name :param dict operation_kwargs: - :param bool allow_kwargs_override: :return: """ - if allow_kwargs_override is not None: - operation_kwargs['allow_kwargs_override'] = allow_kwargs_override - - return OperationTask.for_node( + return utils.create_node_task( node=node, interface_name=interface_name, operation_name=operation_name, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/orchestrator/workflows/builtin/utils.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py index d79318f..8890084 100644 --- a/aria/orchestrator/workflows/builtin/utils.py +++ b/aria/orchestrator/workflows/builtin/utils.py @@ -12,26 +12,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ..api.task import OperationTask + +from ..api.task import OperationTask, StubTask from .. import exceptions -def create_node_task(node, interface_name, operation_name): +def create_node_task(node, interface_name, operation_name, **kwargs): """ Returns a new operation task if the operation exists in the node, otherwise returns None. """ try: + if _is_empty_task(node, interface_name, operation_name): + return StubTask() + return OperationTask.for_node(node=node, interface_name=interface_name, - operation_name=operation_name) + operation_name=operation_name, + **kwargs) except exceptions.OperationNotFoundException: # We will skip nodes which do not have the operation return None def create_relationships_tasks( - node, interface_name, source_operation_name=None, target_operation_name=None): + node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): """ Creates a relationship task (source and target) for all of a node_instance relationships. :param basestring source_operation_name: the relationship operation name. @@ -43,21 +48,18 @@ def create_relationships_tasks( """ sub_tasks = [] for relationship in node.outbound_relationships: - try: - relationship_operations = relationship_tasks( - relationship, - interface_name, - source_operation_name=source_operation_name, - target_operation_name=target_operation_name) - sub_tasks.append(relationship_operations) - except exceptions.OperationNotFoundException: - # We will skip relationships which do not have the operation - pass + relationship_operations = relationship_tasks( + relationship, + interface_name, + source_operation_name=source_operation_name, + target_operation_name=target_operation_name, + **kwargs) + sub_tasks.append(relationship_operations) return sub_tasks -def relationship_tasks( - relationship, interface_name, source_operation_name=None, target_operation_name=None): +def relationship_tasks(relationship, interface_name, source_operation_name=None, + target_operation_name=None, **kwargs): """ Creates a relationship task source and target. :param Relationship relationship: the relationship instance itself @@ -68,19 +70,35 @@ def relationship_tasks( """ operations = [] if source_operation_name: - operations.append( - OperationTask.for_relationship(relationship=relationship, - interface_name=interface_name, - operation_name=source_operation_name, - runs_on='source') - ) + try: + if _is_empty_task(relationship, interface_name, source_operation_name): + operations.append(StubTask()) + + operations.append( + OperationTask.for_relationship(relationship=relationship, + interface_name=interface_name, + operation_name=source_operation_name, + runs_on='source', + **kwargs) + ) + except exceptions.OperationNotFoundException: + # We will skip relationships which do not have the operation + pass if target_operation_name: - operations.append( - OperationTask.for_relationship(relationship=relationship, - interface_name=interface_name, - operation_name=target_operation_name, - runs_on='target') - ) + try: + if _is_empty_task(relationship, interface_name, target_operation_name): + operations.append(StubTask()) + + operations.append( + OperationTask.for_relationship(relationship=relationship, + interface_name=interface_name, + operation_name=target_operation_name, + runs_on='target', + **kwargs) + ) + except exceptions.OperationNotFoundException: + # We will skip relationships which do not have the operation + pass return operations @@ -108,3 +126,15 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): graph.add_dependency(dependency, task) else: graph.add_dependency(task, dependencies) + + +def _is_empty_task(actor, interface_name, operation_name): + interface = actor.interfaces.get(interface_name) + if interface: + operation = interface.operations.get(operation_name) + if operation: + return operation.implementation is None + + raise exceptions.OperationNotFoundException( + 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' + .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index 8302fc9..0e900bb 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -37,6 +37,7 @@ API: * drivers - module, a pool of ARIA standard drivers. * StorageDriver - class, abstract model implementation. """ +import logging from aria.logger import LoggerMixin from . import sql_mapi @@ -71,6 +72,10 @@ class Storage(LoggerMixin): :param kwargs: """ super(Storage, self).__init__(**kwargs) + # Set the logger handler of any storage object to NullHandler. + # This is since the absence of a handler shows up while using the CLI in the form of: + # `No handlers could be found for logger "aria.ResourceStorage"`. + self.logger.addHandler(logging.NullHandler()) self.api = api_cls self.registered = {} self._initiator = initiator http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/archive.py ---------------------------------------------------------------------- diff --git a/aria/utils/archive.py b/aria/utils/archive.py new file mode 100644 index 0000000..5077dec --- /dev/null +++ b/aria/utils/archive.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import tarfile +import zipfile +import tempfile +from contextlib import closing + + +def is_archive(source): + return tarfile.is_tarfile(source) or zipfile.is_zipfile(source) + + +def extract_archive(source): + if tarfile.is_tarfile(source): + return untar(source) + elif zipfile.is_zipfile(source): + return unzip(source) + raise ValueError( + 'Unsupported archive type provided or archive is not valid: {0}.'.format(source)) + + +def tar(source, destination): + with closing(tarfile.open(destination, 'w:gz')) as tar: + tar.add(source, arcname=os.path.basename(source)) + + +def untar(archive, destination=None): + if not destination: + destination = tempfile.mkdtemp() + with closing(tarfile.open(name=archive)) as tar: + tar.extractall(path=destination, members=tar.getmembers()) + return destination + + +def zip(source, destination): + with closing(zipfile.ZipFile(destination, 'w')) as zip_file: + for root, _, files in os.walk(source): + for filename in files: + file_path = os.path.join(root, filename) + source_dir = os.path.dirname(source) + zip_file.write( + file_path, os.path.relpath(file_path, source_dir)) + return destination + + +def unzip(archive, destination=None): + if not destination: + destination = tempfile.mkdtemp() + with closing(zipfile.ZipFile(archive, 'r')) as zip_file: + zip_file.extractall(destination) + return destination http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/utils/exceptions.py b/aria/utils/exceptions.py index 9e3e80f..b60cee4 100644 --- a/aria/utils/exceptions.py +++ b/aria/utils/exceptions.py @@ -15,6 +15,7 @@ import sys import linecache +import StringIO import traceback as tb import jsonpickle @@ -89,6 +90,16 @@ def _print_stack(frame): puts(line) +def get_exception_as_string(exc_type, exc_val, traceback): + s_traceback = StringIO.StringIO() + tb.print_exception( + etype=exc_type, + value=exc_val, + tb=traceback, + file=s_traceback) + return s_traceback.getvalue() + + class _WrappedException(Exception): def __init__(self, exception_type, exception_str): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/file.py ---------------------------------------------------------------------- diff --git a/aria/utils/file.py b/aria/utils/file.py index b515f70..6d1aa16 100644 --- a/aria/utils/file.py +++ b/aria/utils/file.py @@ -15,6 +15,7 @@ import errno import os +import shutil def makedirs(path): @@ -26,3 +27,15 @@ def makedirs(path): except IOError as e: if e.errno != errno.EEXIST: raise + +def remove_if_exists(path): + + try: + if os.path.isfile(path): + os.remove(path) + if os.path.isdir(path): + shutil.rmtree(path) + + except OSError as e: + if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory + raise # re-raise exception if a different error occurred http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/formatting.py ---------------------------------------------------------------------- diff --git a/aria/utils/formatting.py b/aria/utils/formatting.py index 8a223e9..698393f 100644 --- a/aria/utils/formatting.py +++ b/aria/utils/formatting.py @@ -83,6 +83,49 @@ def full_type_name(value): return name if module == '__builtin__' else '%s.%s' % (module, name) +def decode_list(data): + rv = [] + for item in data: + if isinstance(item, unicode): + item = item.encode('utf-8') + elif isinstance(item, list): + item = decode_list(item) + elif isinstance(item, dict): + item = decode_dict(item) + rv.append(item) + return rv + + +def decode_dict(data): + rv = {} + for key, value in data.iteritems(): + if isinstance(key, unicode): + key = key.encode('utf-8') + if isinstance(value, unicode): + value = value.encode('utf-8') + elif isinstance(value, list): + value = decode_list(value) + elif isinstance(value, dict): + value = decode_dict(value) + rv[key] = value + return rv + + +def try_convert_from_str(string, target_type): + if target_type == basestring: + return string + if target_type == bool: + if string.lower() == 'true': + return True + if string.lower() == 'false': + return False + return string + try: + return target_type(string) + except ValueError: + return string + + def safe_str(value): """ Like :code:`str` coercion, but makes sure that Unicode strings are properly http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/threading.py ---------------------------------------------------------------------- diff --git a/aria/utils/threading.py b/aria/utils/threading.py index b99250d..f4e9c0e 100644 --- a/aria/utils/threading.py +++ b/aria/utils/threading.py @@ -15,6 +15,7 @@ from __future__ import absolute_import # so we can import standard 'threading' +import sys import itertools import multiprocessing from threading import (Thread, Lock) @@ -255,3 +256,26 @@ class LockedList(list): def __exit__(self, the_type, value, traceback): return self.lock.__exit__(the_type, value, traceback) + + +class ExceptionThread(Thread): + """ + A thread from which top level exceptions can be retrieved or reraised + """ + def __init__(self, *args, **kwargs): + Thread.__init__(self, *args, **kwargs) + self.exception = None + + def run(self): + try: + super(ExceptionThread, self).run() + except BaseException: + self.exception = sys.exc_info() + + def is_error(self): + return self.exception is not None + + def raise_error_if_exists(self): + if self.is_error(): + t, v, tb = self.exception + raise t, v, tb http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/aria/utils/type.py ---------------------------------------------------------------------- diff --git a/aria/utils/type.py b/aria/utils/type.py new file mode 100644 index 0000000..e427be1 --- /dev/null +++ b/aria/utils/type.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def validate_value_type(value, type_name): + """Supports both python and yaml type names""" + #TODO add timestamp type? + + name_to_type = { + 'list': list, + 'dict': dict, + 'str': str, + 'unicode': str, + 'string': str, + 'int': int, + 'integer': int, + 'bool': bool, + 'boolean': bool, + 'float': float + } + + type = name_to_type.get(type_name.lower()) + if type is None: + raise ValueError('No supported type_name was provided') + try: + type(value) + except ValueError: + raise False + + +def convert_value_to_type(str_value, type_name): + try: + if type_name.lower() in ['str', 'unicode']: + return str_value.decode('utf-8') + elif type_name.lower() == 'int': + return int(str_value) + elif type_name.lower() == 'bool': + return bool(str_value) + elif type_name.lower() == 'float': + return float(str_value) + else: + raise ValueError('No supported type_name was provided') + except ValueError: + raise ValueError('Trying to convert {0} to {1} failed'.format(str_value, + type_name)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py ---------------------------------------------------------------------- diff --git a/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py b/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py index 267f6de..e697bc2 100644 --- a/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py +++ b/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py @@ -19,6 +19,7 @@ Creates ARIA service template models based on the TOSCA presentation. Relies on many helper methods in the presentation classes. """ +import os import re from types import FunctionType from datetime import datetime @@ -34,7 +35,7 @@ from ..data_types import coerce_value def create_service_template_model(context): # pylint: disable=too-many-locals,too-many-branches model = ServiceTemplate(created_at=datetime.now(), - main_file_name=str(context.presentation.location)) + main_file_name=os.path.basename(str(context.presentation.location))) model.description = context.presentation.get('service_template', 'description', 'value') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/requirements.in ---------------------------------------------------------------------- diff --git a/requirements.in b/requirements.in index bc27479..2c7978f 100644 --- a/requirements.in +++ b/requirements.in @@ -25,6 +25,11 @@ SQLAlchemy>=1.1.0, <1.2 # version 1.2 dropped support of python 2.6 wagon==0.6.0 bottle>=0.12.0, <0.13 Fabric>=1.13.0, <1.14 +click==4.1 +colorama==0.3.3 +PrettyTable>=0.7,<0.8 +click_didyoumean==0.0.3 +backports.shutil_get_terminal_size==1.0.0 # Since the tool we are using to generate our requirements.txt, `pip-tools`, # does not currently support conditional dependencies (;), we're adding our original http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 3d72ebc..b64453a 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,7 @@ except IOError: extras_require = {} -console_scripts = ['aria = aria.cli.cli:main'] +console_scripts = ['aria = aria.cli.main:main'] def _generate_user_options(command): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index f943d7e..ac0a8a7 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -39,12 +39,17 @@ def simple(tmpdir, inmemory=False, context_kwargs=None, topology=None): api_kwargs=dict(directory=os.path.join(tmpdir, 'resources')) ) + service_id = topology(model_storage) + execution = models.create_execution(model_storage.service.get(service_id)) + model_storage.execution.put(execution) + final_kwargs = dict( name='simple_context', model_storage=model_storage, resource_storage=resource_storage, - service_id=topology(model_storage), + service_id=service_id, workflow_name=models.WORKFLOW_NAME, + execution_id=execution.id, task_max_attempts=models.TASK_MAX_ATTEMPTS, task_retry_interval=models.TASK_RETRY_INTERVAL ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 1d29e2d..6b7f810 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -40,7 +40,6 @@ from aria.orchestrator.workflows.builtin.workflows import ( SERVICE_NAME = 'test_service_name' SERVICE_TEMPLATE_NAME = 'test_service_template_name' WORKFLOW_NAME = 'test_workflow_name' -EXECUTION_NAME = 'test_execution_name' TASK_RETRY_INTERVAL = 1 TASK_MAX_ATTEMPTS = 1 @@ -168,6 +167,13 @@ def create_interface_template(service_template, interface_name, operation_name, def create_interface(service, interface_name, operation_name, operation_kwargs=None, interface_kwargs=None): the_type = service.service_template.interface_types.get_descendant('test_interface_type') + + if operation_kwargs and operation_kwargs.get('inputs'): + wrapped_inputs = {} + for input_name, input_value in operation_kwargs['inputs'].iteritems(): + wrapped_inputs[input_name] = models.Parameter.wrap(input_name, input_value) + operation_kwargs['inputs'] = wrapped_inputs + operation = models.Operation( name=operation_name, **(operation_kwargs or {}) @@ -183,10 +189,11 @@ def create_interface(service, interface_name, operation_name, operation_kwargs=N def create_execution(service): return models.Execution( service=service, - status=models.Execution.STARTED, + status=models.Execution.PENDING, workflow_name=WORKFLOW_NAME, + created_at=datetime.utcnow(), started_at=datetime.utcnow(), - parameters=None + inputs={} ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index 8cd00f8..e459821 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -253,7 +253,7 @@ class TestService(object): class TestExecution(object): @pytest.mark.parametrize( - 'is_valid, created_at, started_at, ended_at, error, is_system_workflow, parameters, ' + 'is_valid, created_at, started_at, ended_at, error, is_system_workflow, inputs, ' 'status, workflow_name', [ (False, m_cls, now, now, 'error', False, {}, Execution.STARTED, 'wf_name'), @@ -268,11 +268,11 @@ class TestExecution(object): (True, now, None, now, 'error', False, {}, Execution.STARTED, 'wf_name'), (True, now, now, None, 'error', False, {}, Execution.STARTED, 'wf_name'), (True, now, now, now, None, False, {}, Execution.STARTED, 'wf_name'), - (True, now, now, now, 'error', False, None, Execution.STARTED, 'wf_name'), + (True, now, now, now, 'error', False, {}, Execution.STARTED, 'wf_name'), ] ) def test_execution_model_creation(self, service_storage, is_valid, created_at, started_at, - ended_at, error, is_system_workflow, parameters, status, + ended_at, error, is_system_workflow, inputs, status, workflow_name): execution = _test_model( is_valid=is_valid, @@ -285,7 +285,7 @@ class TestExecution(object): ended_at=ended_at, error=error, is_system_workflow=is_system_workflow, - parameters=parameters, + inputs=inputs, status=status, workflow_name=workflow_name, )) @@ -299,7 +299,7 @@ class TestExecution(object): id='e_id', workflow_name='w_name', status=status, - parameters={}, + inputs={}, created_at=now, ) return execution http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index f55b83e..47c82dc 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -69,16 +69,17 @@ def test_node_operation_task_execution(ctx, thread_executor): interface_name = 'Standard' operation_name = 'create' + inputs = {'putput': True} node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface = mock.models.create_interface( node.service, interface_name, operation_name, - operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)) + operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__), + inputs=inputs) ) node.interfaces[interface.name] = interface ctx.model.node.update(node) - inputs = {'putput': True} @workflow def basic_workflow(graph, **_): @@ -124,17 +125,18 @@ def test_relationship_operation_task_execution(ctx, thread_executor): interface_name = 'Configure' operation_name = 'post_configure' + inputs = {'putput': True} relationship = ctx.model.relationship.list()[0] interface = mock.models.create_interface( relationship.source_node.service, interface_name, operation_name, - operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)), + operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__), + inputs=inputs), ) relationship.interfaces[interface.name] = interface ctx.model.relationship.update(relationship) - inputs = {'putput': True} @workflow def basic_workflow(graph, **_): @@ -232,21 +234,21 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): ctx.model.plugin.put(plugin) plugin_specification = mock.models.create_plugin_specification() node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + filename = 'test_file' + content = 'file content' + inputs = {'filename': filename, 'content': content} interface = mock.models.create_interface( node.service, interface_name, operation_name, operation_kwargs=dict( implementation='{0}.{1}'.format(__name__, _test_plugin_workdir.__name__), - plugin_specification=plugin_specification) + plugin_specification=plugin_specification, + inputs=inputs) ) node.interfaces[interface.name] = interface ctx.model.node.update(node) - filename = 'test_file' - content = 'file content' - inputs = {'filename': filename, 'content': content} - @workflow def basic_workflow(graph, **_): graph.add_tasks(api.task.OperationTask.for_node(node=node, @@ -278,21 +280,22 @@ def test_node_operation_logging(ctx, executor): interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + inputs = { + 'op_start': 'op_start', + 'op_end': 'op_end', + } interface = mock.models.create_interface( node.service, interface_name, operation_name, operation_kwargs=dict( - implementation=op_path(logged_operation, module_path=__name__)) + implementation=op_path(logged_operation, module_path=__name__), + inputs=inputs) ) node.interfaces[interface.name] = interface ctx.model.node.update(node) - inputs = { - 'op_start': 'op_start', - 'op_end': 'op_end', - } - @workflow def basic_workflow(graph, **_): graph.add_tasks( @@ -312,20 +315,20 @@ def test_relationship_operation_logging(ctx, executor): interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0] relationship = ctx.model.relationship.list()[0] + inputs = { + 'op_start': 'op_start', + 'op_end': 'op_end', + } interface = mock.models.create_interface( relationship.source_node.service, interface_name, operation_name, - operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__)) + operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__), + inputs=inputs) ) relationship.interfaces[interface.name] = interface ctx.model.relationship.update(relationship) - inputs = { - 'op_start': 'op_start', - 'op_end': 'op_end', - } - @workflow def basic_workflow(graph, **_): graph.add_tasks( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_resource_render.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_resource_render.py b/tests/orchestrator/context/test_resource_render.py index 696e9b3..8113746 100644 --- a/tests/orchestrator/context/test_resource_render.py +++ b/tests/orchestrator/context/test_resource_render.py @@ -64,9 +64,9 @@ def resources(tmpdir, ctx): implicit_ctx_template_path.write(_IMPLICIT_CTX_TEMPLATE) variables_template_path = tmpdir.join(_VARIABLES_TEMPLATE_PATH) variables_template_path.write(_VARIABLES_TEMPLATE) - ctx.resource.deployment.upload(entry_id='1', + ctx.resource.service.upload(entry_id='1', source=str(implicit_ctx_template_path), path=_IMPLICIT_CTX_TEMPLATE_PATH) - ctx.resource.deployment.upload(entry_id='1', + ctx.resource.service.upload(entry_id='1', source=str(variables_template_path), path=_VARIABLES_TEMPLATE_PATH) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index db45e8e..5fdb674 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -34,7 +34,7 @@ def test_serialize_operation_context(context, executor, tmpdir): test_file = tmpdir.join(TEST_FILE_NAME) test_file.write(TEST_FILE_CONTENT) resource = context.resource - resource.blueprint.upload(TEST_FILE_ENTRY_ID, str(test_file)) + resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file)) graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) eng.execute() @@ -73,7 +73,7 @@ def _mock_operation(ctx): # a correct ctx.deployment.name tells us we kept the correct deployment_id assert ctx.service.name == mock.models.SERVICE_NAME # Here we test that the resource storage was properly re-created - test_file_content = ctx.resource.blueprint.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME) + test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME) assert test_file_content == TEST_FILE_CONTENT # a non empty plugin workdir tells us that we kept the correct base_workdir assert ctx.plugin_workdir is not None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py index cf82127..213d964 100644 --- a/tests/orchestrator/context/test_toolbelt.py +++ b/tests/orchestrator/context/test_toolbelt.py @@ -76,15 +76,16 @@ def test_host_ip(workflow_context, executor): interface_name = 'Standard' operation_name = 'create' _, dependency_node, _, _, _ = _get_elements(workflow_context) + inputs = {'putput': True} interface = mock.models.create_interface( dependency_node.service, interface_name=interface_name, operation_name=operation_name, - operation_kwargs=dict(implementation=op_path(host_ip, module_path=__name__)) + operation_kwargs=dict(implementation=op_path(host_ip, module_path=__name__), + inputs=inputs) ) dependency_node.interfaces[interface.name] = interface workflow_context.model.node.update(dependency_node) - inputs = {'putput': True} @workflow def basic_workflow(graph, **_): @@ -106,17 +107,17 @@ def test_relationship_tool_belt(workflow_context, executor): interface_name = 'Configure' operation_name = 'post_configure' _, _, _, _, relationship = _get_elements(workflow_context) + inputs = {'putput': True} interface = mock.models.create_interface( relationship.source_node.service, interface_name=interface_name, operation_name=operation_name, - operation_kwargs=dict(implementation=op_path(relationship_operation, module_path=__name__)) + operation_kwargs=dict(implementation=op_path(relationship_operation, module_path=__name__), + inputs=inputs) ) relationship.interfaces[interface.name] = interface workflow_context.model.relationship.update(relationship) - inputs = {'putput': True} - @workflow def basic_workflow(graph, **_): graph.add_tasks( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/context/test_workflow.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_workflow.py b/tests/orchestrator/context/test_workflow.py index fa1f387..3c35435 100644 --- a/tests/orchestrator/context/test_workflow.py +++ b/tests/orchestrator/context/test_workflow.py @@ -35,7 +35,7 @@ class TestWorkflowContext(object): assert execution.service_template == storage.service_template.get_by_name( models.SERVICE_TEMPLATE_NAME) assert execution.status == storage.execution.model_cls.PENDING - assert execution.parameters == {} + assert execution.inputs == {} assert execution.created_at <= datetime.utcnow() def test_subsequent_workflow_context_creation_do_not_fail(self, storage): @@ -49,11 +49,13 @@ class TestWorkflowContext(object): :param storage: :return WorkflowContext: """ + service = storage.service.get_by_name(models.SERVICE_NAME) return context.workflow.WorkflowContext( name='simple_context', model_storage=storage, resource_storage=None, - service_id=storage.service.get_by_name(models.SERVICE_NAME).id, + service_id=service, + execution_id=storage.execution.list(filters=dict(service=service))[0].id, workflow_name=models.WORKFLOW_NAME, task_max_attempts=models.TASK_MAX_ATTEMPTS, task_retry_interval=models.TASK_RETRY_INTERVAL @@ -66,6 +68,8 @@ def storage(): sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage) workflow_storage.service_template.put(models.create_service_template()) service_template = workflow_storage.service_template.get_by_name(models.SERVICE_TEMPLATE_NAME) - workflow_storage.service.put(models.create_service(service_template)) + service = models.create_service(service_template) + workflow_storage.service.put(service) + workflow_storage.execution.put(models.create_execution(service)) yield workflow_storage test_storage.release_sqlite_storage(workflow_storage) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index e3612cf..67d527c 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -460,14 +460,15 @@ if __name__ == '__main__': env_var='value', inputs=None): local_script_path = script_path - script_path = os.path.basename(local_script_path) if local_script_path else None + script_path = os.path.basename(local_script_path) if local_script_path else '' + inputs = inputs or {} + process = process or {} if script_path: - workflow_context.resource.deployment.upload( + workflow_context.resource.service.upload( entry_id=str(workflow_context.service.id), source=local_script_path, path=script_path) - inputs = inputs or {} inputs.update({ 'script_path': script_path, 'process': process, @@ -483,7 +484,8 @@ if __name__ == '__main__': 'op', operation_kwargs=dict(implementation='{0}.{1}'.format( operations.__name__, - operations.run_script_locally.__name__)) + operations.run_script_locally.__name__), + inputs=inputs) ) node.interfaces[interface.name] = interface graph.add_tasks(api.task.OperationTask.for_node( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index dd36466..d17def1 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -258,7 +258,7 @@ class TestWithActualSSHServer(object): return collected[signal][0]['kwargs']['exception'] def _upload(self, source, path): - self._workflow_context.resource.deployment.upload( + self._workflow_context.resource.service.upload( entry_id=str(self._workflow_context.service.id), source=source, path=path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/workflows/builtin/test_execute_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py b/tests/orchestrator/workflows/builtin/test_execute_operation.py index 360e17d..41e2b6b 100644 --- a/tests/orchestrator/workflows/builtin/test_execute_operation.py +++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py @@ -34,7 +34,8 @@ def test_execute_operation(ctx): interface = mock.models.create_interface( ctx.service, interface_name, - operation_name + operation_name, + operation_kwargs={'implementation': 'stub-implementation'} ) node.interfaces[interface.name] = interface ctx.model.node.update(node) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 6f97952..e793c49 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -61,12 +61,18 @@ class BaseTest(object): retry_interval=None, ignore_failure=None): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + operation_kwargs = dict(implementation='{name}.{func.__name__}'.format( + name=__name__, func=func)) + if inputs: + # the operation has to declare the inputs before those may be passed + operation_kwargs['inputs'] = inputs + interface = mock.models.create_interface( node.service, 'aria.interfaces.lifecycle', 'create', - operation_kwargs=dict(implementation='{name}.{func.__name__}'.format(name=__name__, - func=func)) + operation_kwargs=operation_kwargs ) node.interfaces[interface.name] = interface return api.task.OperationTask.for_node( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py deleted file mode 100644 index 0a95d43..0000000 --- a/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py +++ /dev/null @@ -1,111 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from networkx import topological_sort, DiGraph - -from aria.orchestrator import context -from aria.orchestrator.workflows import api, core - -from tests import mock -from tests import storage - - -def test_task_graph_into_execution_graph(tmpdir): - interface_name = 'Standard' - operation_name = 'create' - task_context = mock.context.simple(str(tmpdir)) - node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - interface = mock.models.create_interface( - node.service, - interface_name, - operation_name - ) - node.interfaces[interface.name] = interface - task_context.model.node.update(node) - - def sub_workflow(name, **_): - return api.task_graph.TaskGraph(name) - - with context.workflow.current.push(task_context): - test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') - simple_before_task = api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name) - simple_after_task = api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name) - - inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') - inner_task = api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name) - inner_task_graph.add_tasks(inner_task) - - test_task_graph.add_tasks(simple_before_task) - test_task_graph.add_tasks(simple_after_task) - test_task_graph.add_tasks(inner_task_graph) - test_task_graph.add_dependency(inner_task_graph, simple_before_task) - test_task_graph.add_dependency(simple_after_task, inner_task_graph) - - # Direct check - execution_graph = DiGraph() - core.translation.build_execution_graph(task_graph=test_task_graph, - execution_graph=execution_graph) - execution_tasks = topological_sort(execution_graph) - - assert len(execution_tasks) == 7 - - expected_tasks_names = [ - '{0}-Start'.format(test_task_graph.id), - simple_before_task.id, - '{0}-Start'.format(inner_task_graph.id), - inner_task.id, - '{0}-End'.format(inner_task_graph.id), - simple_after_task.id, - '{0}-End'.format(test_task_graph.id) - ] - - assert expected_tasks_names == execution_tasks - - assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), - core.task.StartWorkflowTask) - - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph), - simple_before_task) - assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), - core.task.StartSubWorkflowTask) - - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph), - inner_task) - assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), - core.task.EndSubWorkflowTask) - - _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph), - simple_after_task) - assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), - core.task.EndWorkflowTask) - storage.release_sqlite_storage(task_context.model) - - -def _assert_execution_is_api_task(execution_task, api_task): - assert execution_task.id == api_task.id - assert execution_task.name == api_task.name - assert execution_task.implementation == api_task.implementation - assert execution_task.actor == api_task.actor - assert execution_task.inputs == api_task.inputs - - -def _get_task_by_name(task_name, graph): - return graph.node[task_name]['task'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py new file mode 100644 index 0000000..0a95d43 --- /dev/null +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from networkx import topological_sort, DiGraph + +from aria.orchestrator import context +from aria.orchestrator.workflows import api, core + +from tests import mock +from tests import storage + + +def test_task_graph_into_execution_graph(tmpdir): + interface_name = 'Standard' + operation_name = 'create' + task_context = mock.context.simple(str(tmpdir)) + node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + interface_name, + operation_name + ) + node.interfaces[interface.name] = interface + task_context.model.node.update(node) + + def sub_workflow(name, **_): + return api.task_graph.TaskGraph(name) + + with context.workflow.current.push(task_context): + test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') + simple_before_task = api.task.OperationTask.for_node(node=node, + interface_name=interface_name, + operation_name=operation_name) + simple_after_task = api.task.OperationTask.for_node(node=node, + interface_name=interface_name, + operation_name=operation_name) + + inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') + inner_task = api.task.OperationTask.for_node(node=node, + interface_name=interface_name, + operation_name=operation_name) + inner_task_graph.add_tasks(inner_task) + + test_task_graph.add_tasks(simple_before_task) + test_task_graph.add_tasks(simple_after_task) + test_task_graph.add_tasks(inner_task_graph) + test_task_graph.add_dependency(inner_task_graph, simple_before_task) + test_task_graph.add_dependency(simple_after_task, inner_task_graph) + + # Direct check + execution_graph = DiGraph() + core.translation.build_execution_graph(task_graph=test_task_graph, + execution_graph=execution_graph) + execution_tasks = topological_sort(execution_graph) + + assert len(execution_tasks) == 7 + + expected_tasks_names = [ + '{0}-Start'.format(test_task_graph.id), + simple_before_task.id, + '{0}-Start'.format(inner_task_graph.id), + inner_task.id, + '{0}-End'.format(inner_task_graph.id), + simple_after_task.id, + '{0}-End'.format(test_task_graph.id) + ] + + assert expected_tasks_names == execution_tasks + + assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), + core.task.StartWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph), + simple_before_task) + assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), + core.task.StartSubWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph), + inner_task) + assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), + core.task.EndSubWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph), + simple_after_task) + assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), + core.task.EndWorkflowTask) + storage.release_sqlite_storage(task_context.model) + + +def _assert_execution_is_api_task(execution_task, api_task): + assert execution_task.id == api_task.id + assert execution_task.name == api_task.name + assert execution_task.implementation == api_task.implementation + assert execution_task.actor == api_task.actor + assert execution_task.inputs == api_task.inputs + + +def _get_task_by_name(task_name, graph): + return graph.node[task_name]['task'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9852f278/tests/utils/test_threading.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_threading.py b/tests/utils/test_threading.py new file mode 100644 index 0000000..39ce717 --- /dev/null +++ b/tests/utils/test_threading.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +from aria.utils import threading + + +class TestPluginManager(object): + + def test_exception_raised_from_thread(self): + + def error_raising_func(): + raise ValueError('This is an error') + + thread = threading.ExceptionThread(target=error_raising_func) + thread.start() + thread.join() + + assert thread.is_error() + with pytest.raises(ValueError): + thread.raise_error_if_exists()
