http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/modeling/service_template.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py index 51fea2f..f1c2bcb 100644 --- a/aria/modeling/service_template.py +++ b/aria/modeling/service_template.py @@ -280,7 +280,7 @@ class ServiceTemplateBase(TemplateModelMixin): ('interface_types', formatting.as_raw(self.interface_types)), ('artifact_types', formatting.as_raw(self.artifact_types)))) - def instantiate(self, container): + def instantiate(self, container, model_storage, inputs=None): # pylint: disable=arguments-differ from . import models context = ConsumptionContext.get_thread_local() now = datetime.now() @@ -288,13 +288,14 @@ class ServiceTemplateBase(TemplateModelMixin): updated_at=now, description=deepcopy_with_locators(self.description), service_template=self) - #service.name = '{0}_{1}'.format(self.name, service.id) - context.modeling.instance = service + service.inputs = utils.create_inputs(inputs or {}, self.inputs) + # TODO: now that we have inputs, we should scan properties and inputs and evaluate functions + for plugin_specification in self.plugin_specifications.itervalues(): if plugin_specification.enabled: - if plugin_specification.resolve(): + if plugin_specification.resolve(model_storage): plugin = plugin_specification.plugin service.plugins[plugin.name] = plugin else: @@ -316,15 +317,8 @@ class ServiceTemplateBase(TemplateModelMixin): if self.substitution_template is not None: service.substitution = self.substitution_template.instantiate(container) - utils.instantiate_dict(self, service.inputs, self.inputs) utils.instantiate_dict(self, service.outputs, self.outputs) - for name, the_input in context.modeling.inputs.iteritems(): - if name not in service.inputs: - context.validation.report('input "{0}" is not supported'.format(name)) - else: - service.inputs[name].value = the_input - return service def validate(self): @@ -448,8 +442,7 @@ class NodeTemplateBase(TemplateModelMixin): __tablename__ = 'node_template' __private_fields__ = ['type_fk', - 'service_template_fk', - 'service_template_name'] + 'service_template_fk'] # region foreign_keys @@ -472,6 +465,11 @@ class NodeTemplateBase(TemplateModelMixin): """Required for use by SQLAlchemy queries""" return association_proxy('service_template', 'name') + @declared_attr + def type_name(cls): + """Required for use by SQLAlchemy queries""" + return association_proxy('type', 'name') + # endregion # region one_to_one relationships @@ -558,6 +556,7 @@ class NodeTemplateBase(TemplateModelMixin): type=self.type, description=deepcopy_with_locators(self.description), state=models.Node.INITIAL, + runtime_properties={}, node_template=self) utils.instantiate_dict(node, node.properties, self.properties) utils.instantiate_dict(node, node.interfaces, self.interface_templates) @@ -1238,7 +1237,8 @@ class RequirementTemplateBase(TemplateModelMixin): # Find first node that matches the type elif self.target_node_type is not None: - for target_node_template in context.modeling.template.node_templates.itervalues(): + for target_node_template in \ + self.node_template.service_template.node_templates.values(): if self.target_node_type.get_descendant(target_node_template.type.name) is None: continue @@ -1865,16 +1865,22 @@ class OperationTemplateBase(TemplateModelMixin): def instantiate(self, container): from . import models - if self.plugin_specification and self.plugin_specification.enabled: - plugin = self.plugin_specification.plugin - implementation = self.implementation if plugin is not None else None - # "plugin" would be none if a match was not found. In that case, a validation error - # should already have been reported in ServiceTemplateBase.instantiate, so we will - # continue silently here + if self.plugin_specification: + if self.plugin_specification.enabled: + plugin = self.plugin_specification.plugin + implementation = self.implementation if plugin is not None else None + # "plugin" would be none if a match was not found. In that case, a validation error + # should already have been reported in ServiceTemplateBase.instantiate, so we will + # continue silently here + else: + # If the plugin is disabled, the operation should be disabled, too + plugin = None + implementation = None else: - # If the plugin is disabled, the operation should be disabled, too + # using the execution plugin plugin = None - implementation = None + implementation = self.implementation + operation = models.Operation(name=self.name, description=deepcopy_with_locators(self.description), relationship_edge=self.relationship_edge, @@ -2120,25 +2126,16 @@ class PluginSpecificationBase(TemplateModelMixin): def coerce_values(self, container, report_issues): pass - def resolve(self): + def resolve(self, model_storage): # TODO: we are planning a separate "instantiation" module where this will be called or - # moved to. There, we will probably have a context with a storage manager. Until then, - # this is the only potentially available context, which of course will only be available - # if we're in a workflow. - from ..orchestrator import context - try: - workflow_context = context.workflow.current.get() - plugins = workflow_context.model.plugin.list() - except context.exceptions.ContextException: - plugins = None - + # moved to. + plugins = model_storage.plugin.list() matching_plugins = [] - if plugins: - for plugin in plugins: - # TODO: we need to use a version comparator - if (plugin.name == self.name) and \ + for plugin in plugins: + # TODO: we need to use a version comparator + if (plugin.name == self.name) and \ ((self.version is None) or (plugin.package_version >= self.version)): - matching_plugins.append(plugin) + matching_plugins.append(plugin) self.plugin = None if matching_plugins: # Return highest version of plugin
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/modeling/utils.py ---------------------------------------------------------------------- diff --git a/aria/modeling/utils.py b/aria/modeling/utils.py index 0b4015c..91d7b9c 100644 --- a/aria/modeling/utils.py +++ b/aria/modeling/utils.py @@ -13,12 +13,100 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +from json import JSONEncoder +from StringIO import StringIO + +from . import exceptions from ..parser.consumption import ConsumptionContext from ..parser.exceptions import InvalidValueError from ..parser.presentation import Value from ..utils.collections import OrderedDict from ..utils.console import puts -from .exceptions import CannotEvaluateFunctionException +from ..utils.type import validate_value_type + + +class ModelJSONEncoder(JSONEncoder): + def default(self, o): # pylint: disable=method-hidden + from .mixins import ModelMixin + if isinstance(o, ModelMixin): + if hasattr(o, 'value'): + dict_to_return = o.to_dict(fields=('value',)) + return dict_to_return['value'] + else: + return o.to_dict() + else: + return JSONEncoder.default(self, o) + + +def create_inputs(inputs, template_inputs): + """ + :param inputs: key-value dict + :param template_inputs: parameter name to parameter object dict + :return: dict of parameter name to Parameter models + """ + merged_inputs = _merge_and_validate_inputs(inputs, template_inputs) + + from . import models + input_models = [] + for input_name, input_val in merged_inputs.iteritems(): + parameter = models.Parameter( + name=input_name, + type_name=template_inputs[input_name].type_name, + description=template_inputs[input_name].description, + value=input_val) + input_models.append(parameter) + + return dict((inp.name, inp) for inp in input_models) + + +def _merge_and_validate_inputs(inputs, template_inputs): + """ + :param inputs: key-value dict + :param template_inputs: parameter name to parameter object dict + :return: + """ + merged_inputs = inputs.copy() + + missing_inputs = [] + wrong_type_inputs = {} + for input_name, input_template in template_inputs.iteritems(): + if input_name not in inputs: + if input_template.value is not None: + merged_inputs[input_name] = input_template.value # apply default value + else: + missing_inputs.append(input_name) + else: + # Validate input type + try: + validate_value_type(inputs[input_name], input_template.type_name) + except ValueError: + wrong_type_inputs[input_name] = input_template.type_name + except RuntimeError: + # TODO: This error shouldn't be raised (or caught), but right now we lack support + # for custom data_types, which will raise this error. Skipping their validation. + pass + + if missing_inputs: + raise exceptions.MissingRequiredInputsException( + 'Required inputs {0} have not been specified - expected inputs: {1}' + .format(missing_inputs, template_inputs.keys())) + + if wrong_type_inputs: + error_message = StringIO() + for param_name, param_type in wrong_type_inputs.iteritems(): + error_message.write('Input "{0}" must be of type {1}{2}' + .format(param_name, param_type, os.linesep)) + raise exceptions.InputsOfWrongTypeException(error_message.getvalue()) + + undeclared_inputs = [input_name for input_name in inputs.keys() + if input_name not in template_inputs] + if undeclared_inputs: + raise exceptions.UndeclaredInputsException( + 'Undeclared inputs have been specified: {0}; Expected inputs: {1}' + .format(undeclared_inputs, template_inputs.keys())) + + return merged_inputs def coerce_value(container, value, report_issues=False): @@ -35,7 +123,7 @@ def coerce_value(container, value, report_issues=False): try: value = value._evaluate(context, container) value = coerce_value(container, value, report_issues) - except CannotEvaluateFunctionException: + except exceptions.CannotEvaluateFunctionException: pass except InvalidValueError as e: if report_issues: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 127641f..15843db 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -19,7 +19,6 @@ A common context for both workflow and operation import logging from contextlib import contextmanager -from datetime import datetime from functools import partial import jinja2 @@ -55,6 +54,7 @@ class BaseContext(object): self, name, service_id, + execution_id, model_storage, resource_storage, workdir=None, @@ -65,27 +65,17 @@ class BaseContext(object): self._model = model_storage self._resource = resource_storage self._service_id = service_id + self._execution_id = execution_id self._workdir = workdir self.logger = None - def _create_execution(self): - now = datetime.utcnow() - execution = self.model.execution.model_cls( - service_instance=self.service_instance, - workflow_name=self._workflow_name, - created_at=now, - parameters=self.parameters, - ) - self.model.execution.put(execution) - return execution.id - - def _register_logger(self, logger_name=None, level=None, task_id=None): - self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__), - self.logging_id, - task_id=task_id) - self.logger.addHandler(aria_logger.create_console_log_handler()) - self.logger.addHandler(self._get_sqla_handler()) + def _register_logger(self, level=None, task_id=None): + self.logger = self.PrefixedLogger( + logging.getLogger(aria_logger.TASK_LOGGER_NAME), self.logging_id, task_id=task_id) self.logger.setLevel(level or logging.DEBUG) + if not self.logger.handlers: + self.logger.addHandler(aria_logger.create_console_log_handler()) + self.logger.addHandler(self._get_sqla_handler()) def _get_sqla_handler(self): api_kwargs = {} @@ -168,13 +158,13 @@ class BaseContext(object): Download a blueprint resource from the resource storage """ try: - self.resource.deployment.download(entry_id=str(self.service.id), - destination=destination, - path=path) + self.resource.service.download(entry_id=str(self.service.id), + destination=destination, + path=path) except exceptions.StorageError: - self.resource.blueprint.download(entry_id=str(self.service_template.id), - destination=destination, - path=path) + self.resource.service_template.download(entry_id=str(self.service_template.id), + destination=destination, + path=path) def download_resource_and_render(self, destination, path=None, variables=None): """ @@ -193,9 +183,10 @@ class BaseContext(object): Read a deployment resource as string from the resource storage """ try: - return self.resource.deployment.read(entry_id=str(self.service.id), path=path) + return self.resource.service.read(entry_id=str(self.service.id), path=path) except exceptions.StorageError: - return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path) + return self.resource.service_template.read(entry_id=str(self.service_template.id), + path=path) def get_resource_and_render(self, path=None, variables=None): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index cbd186c..c7d8246 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -36,7 +36,6 @@ class BaseOperationContext(BaseContext): service_id, task_id, actor_id, - execution_id, **kwargs): super(BaseOperationContext, self).__init__( name=name, @@ -47,7 +46,6 @@ class BaseOperationContext(BaseContext): self._task_id = task_id self._actor_id = actor_id self._thread_local = threading.local() - self._execution_id = execution_id self._register_logger(task_id=self.task.id) def __repr__(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 5f86d9d..667d22f 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -19,7 +19,6 @@ Workflow and operation contexts import threading from contextlib import contextmanager -from datetime import datetime from .exceptions import ContextException from .common import BaseContext @@ -35,36 +34,21 @@ class WorkflowContext(BaseContext): task_max_attempts=1, task_retry_interval=0, task_ignore_failure=False, - execution_id=None, *args, **kwargs): super(WorkflowContext, self).__init__(*args, **kwargs) self._workflow_name = workflow_name - self.parameters = parameters or {} + self._parameters = parameters or {} self._task_max_attempts = task_max_attempts self._task_retry_interval = task_retry_interval self._task_ignore_failure = task_ignore_failure - # TODO: execution creation should happen somewhere else - # should be moved there, when such logical place exists - self._execution_id = execution_id or self._create_execution() self._register_logger() def __repr__(self): return ( '{name}(deployment_id={self._service_id}, ' - 'workflow_name={self._workflow_name}'.format( + 'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format( name=self.__class__.__name__, self=self)) - def _create_execution(self): - now = datetime.utcnow() - execution = self.model.execution.model_cls( - service=self.service, - workflow_name=self._workflow_name, - created_at=now, - parameters=self.parameters, - ) - self.model.execution.put(execution) - return execution.id - @property def logging_id(self): return '{0}[{1}]'.format(self._workflow_name, self._execution_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py index c00b66b..8d3dcc6 100644 --- a/aria/orchestrator/exceptions.py +++ b/aria/orchestrator/exceptions.py @@ -25,6 +25,13 @@ class OrchestratorError(AriaError): pass +class InvalidPluginError(AriaError): + """ + Raised when an invalid plugin is validated unsuccessfully + """ + pass + + class PluginAlreadyExistsError(AriaError): """ Raised when a plugin with the same package name and package version already exists @@ -46,3 +53,24 @@ class TaskAbortException(RuntimeError): Used internally when ctx.task.abort is called """ pass + + +class UndeclaredWorkflowError(AriaError): + """ + Raised when attempting to execute an undeclared workflow + """ + pass + + +class ActiveExecutionsError(AriaError): + """ + Raised when attempting to execute a workflow on a service which already has an active execution + """ + pass + + +class WorkflowImplementationNotFoundError(AriaError): + """ + Raised when attempting to import a workflow's code but the implementation is not found + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 817d064..52a5312 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -24,6 +24,7 @@ import StringIO import wsgiref.simple_server import bottle +from aria import modeling from .. import exceptions @@ -111,7 +112,7 @@ class CtxProxy(object): result = json.dumps({ 'type': result_type, 'payload': payload - }) + }, cls=modeling.utils.ModelJSONEncoder) except Exception as e: traceback_out = StringIO.StringIO() traceback.print_exc(file=traceback_out) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/execution_plugin/instantiation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/instantiation.py b/aria/orchestrator/execution_plugin/instantiation.py index 960835c..7627a38 100644 --- a/aria/orchestrator/execution_plugin/instantiation.py +++ b/aria/orchestrator/execution_plugin/instantiation.py @@ -27,7 +27,7 @@ def configure_operation(operation): arguments = OrderedDict() arguments['script_path'] = operation.implementation arguments['process'] = _get_process(configuration.pop('process')) \ - if 'process' in configuration else None + if 'process' in configuration else dict() host = None interface = operation.interface http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/plugin.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/plugin.py b/aria/orchestrator/plugin.py index d815754..f99666c 100644 --- a/aria/orchestrator/plugin.py +++ b/aria/orchestrator/plugin.py @@ -17,6 +17,7 @@ import os import tempfile import subprocess import sys +import zipfile from datetime import datetime import wagon @@ -43,11 +44,11 @@ class PluginManager(object): os_props = metadata['build_server_os_properties'] plugin = cls( + name=metadata['package_name'], archive_name=metadata['archive_name'], supported_platform=metadata['supported_platform'], supported_py_versions=metadata['supported_python_versions'], - # Remove suffix colon after upgrading wagon to > 0.5.0 - distribution=os_props.get('distribution:') or os_props.get('distribution'), + distribution=os_props.get('distribution'), distribution_release=os_props['distribution_version'], distribution_version=os_props['distribution_release'], package_name=metadata['package_name'], @@ -70,6 +71,28 @@ class PluginManager(object): self._plugins_dir, '{0}-{1}'.format(plugin.package_name, plugin.package_version)) + @staticmethod + def validate_plugin(source): + """ + validate a plugin archive. + A valid plugin is a wagon (http://github.com/cloudify-cosmo/wagon) + in the zip format (suffix may also be .wgn). + """ + if not zipfile.is_zipfile(source): + raise exceptions.InvalidPluginError( + 'Archive {0} is of an unsupported type. Only ' + 'zip/wgn is allowed'.format(source)) + with zipfile.ZipFile(source, 'r') as zip_file: + infos = zip_file.infolist() + try: + package_name = infos[0].filename[:infos[0].filename.index('/')] + package_json_path = "{0}/{1}".format(package_name, 'package.json') + zip_file.getinfo(package_json_path) + except (KeyError, ValueError, IndexError): + raise exceptions.InvalidPluginError( + 'Failed to validate plugin {0} ' + '(package.json was not found in archive)'.format(source)) + def _install_wagon(self, source, prefix): pip_freeze_output = self._pip_freeze() file_descriptor, constraint_path = tempfile.mkstemp(prefix='constraint-', suffix='.txt') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py deleted file mode 100644 index f1633fa..0000000 --- a/aria/orchestrator/runner.py +++ /dev/null @@ -1,101 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow runner -""" - -import tempfile -import os - -from .context.workflow import WorkflowContext -from .workflows.core.engine import Engine -from .workflows.executor.thread import ThreadExecutor -from ..storage import ( - sql_mapi, - filesystem_rapi, -) -from .. import ( - application_model_storage, - application_resource_storage -) - - -class Runner(object): - """ - Runs workflows on a deployment. By default uses temporary storage (either on disk or in memory) - but can also be used with existing storage. - - Handles the initialization of the storage engine and provides convenience methods for - sub-classes to create tasks. - - :param path: path to Sqlite database file; use '' (the default) to use a temporary file, - and None to use an in-memory database - :type path: string - """ - - def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn, - service_id_fn, storage_path='', is_storage_temporary=True): - if storage_path == '': - # Temporary file storage - the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-') - os.close(the_file) - - self._storage_path = storage_path - self._storage_dir = os.path.dirname(storage_path) - self._storage_name = os.path.basename(storage_path) - self._is_storage_temporary = is_storage_temporary - - workflow_context = self.create_workflow_context(workflow_name, initialize_model_storage_fn, - service_id_fn) - - tasks_graph = workflow_fn(ctx=workflow_context, **inputs) - - self._engine = Engine( - executor=ThreadExecutor(), - workflow_context=workflow_context, - tasks_graph=tasks_graph) - - def run(self): - try: - self._engine.execute() - finally: - self.cleanup() - - def create_workflow_context(self, - workflow_name, - initialize_model_storage_fn, - service_id_fn): - self.cleanup() - model_storage = application_model_storage( - sql_mapi.SQLAlchemyModelAPI, - initiator_kwargs=dict(base_dir=self._storage_dir, filename=self._storage_name)) - if initialize_model_storage_fn: - initialize_model_storage_fn(model_storage) - resource_storage = application_resource_storage( - filesystem_rapi.FileSystemResourceAPI, api_kwargs=dict(directory='.')) - return WorkflowContext( - name=workflow_name, - model_storage=model_storage, - resource_storage=resource_storage, - service_id=service_id_fn(), - workflow_name=self.__class__.__name__, - task_max_attempts=1, - task_retry_interval=1) - - def cleanup(self): - if (self._is_storage_temporary and (self._storage_path is not None) and - os.path.isfile(self._storage_path)): - os.remove(self._storage_path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py new file mode 100644 index 0000000..1ea60a1 --- /dev/null +++ b/aria/orchestrator/workflow_runner.py @@ -0,0 +1,161 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow runner +""" + +import os +import sys +from datetime import datetime + +from . import exceptions +from .context.workflow import WorkflowContext +from .workflows import builtin +from .workflows.core.engine import Engine +from .workflows.executor.process import ProcessExecutor +from ..modeling import models +from ..modeling import utils as modeling_utils +from ..utils.imports import import_fullname + + +DEFAULT_TASK_MAX_ATTEMPTS = 30 +DEFAULT_TASK_RETRY_INTERVAL = 30 + + +class WorkflowRunner(object): + + def __init__(self, workflow_name, service_id, inputs, + model_storage, resource_storage, plugin_manager, + executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, + task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): + """ + Manages a single workflow execution on a given service + :param workflow_name: Workflow name + :param service_id: Service id + :param inputs: A key-value dict of inputs for the execution + :param model_storage: Model storage + :param resource_storage: Resource storage + :param plugin_manager: Plugin manager + :param executor: Executor for tasks. Defaults to a ProcessExecutor instance. + :param task_max_attempts: Maximum attempts of repeating each failing task + :param task_retry_interval: Retry interval in between retry attempts of a failing task + """ + + self._model_storage = model_storage + self._resource_storage = resource_storage + self._workflow_name = workflow_name + + # the IDs are stored rather than the models themselves, so this module could be used + # by several threads without raising errors on model objects shared between threads + self._service_id = service_id + + self._validate_workflow_exists_for_service() + + workflow_fn = self._get_workflow_fn() + + execution = self._create_execution_model(inputs) + self._execution_id = execution.id + + workflow_context = WorkflowContext( + name=self.__class__.__name__, + model_storage=self._model_storage, + resource_storage=resource_storage, + service_id=service_id, + execution_id=execution.id, + workflow_name=workflow_name, + task_max_attempts=task_max_attempts, + task_retry_interval=task_retry_interval) + + # transforming the execution inputs to dict, to pass them to the workflow function + execution_inputs_dict = dict(inp.unwrap() for inp in self.execution.inputs.values()) + self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict) + + executor = executor or ProcessExecutor(plugin_manager=plugin_manager) + self._engine = Engine( + executor=executor, + workflow_context=workflow_context, + tasks_graph=self._tasks_graph) + + @property + def execution(self): + return self._model_storage.execution.get(self._execution_id) + + @property + def service(self): + return self._model_storage.service.get(self._service_id) + + def execute(self): + self._engine.execute() + + def cancel(self): + self._engine.cancel_execution() + + def _create_execution_model(self, inputs): + execution = models.Execution( + created_at=datetime.utcnow(), + service=self.service, + workflow_name=self._workflow_name, + inputs={}) + + if self._workflow_name in builtin.BUILTIN_WORKFLOWS: + workflow_inputs = dict() # built-in workflows don't have any inputs + else: + workflow_inputs = self.service.workflows[self._workflow_name].inputs + + execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs) + # TODO: these two following calls should execute atomically + self._validate_no_active_executions(execution) + self._model_storage.execution.put(execution) + return execution + + def _validate_workflow_exists_for_service(self): + if self._workflow_name not in self.service.workflows and \ + self._workflow_name not in builtin.BUILTIN_WORKFLOWS: + raise exceptions.UndeclaredWorkflowError( + 'No workflow policy {0} declared in service {1}' + .format(self._workflow_name, self.service.name)) + + def _validate_no_active_executions(self, execution): + active_executions = [e for e in self.service.executions if e.is_active()] + if active_executions: + raise exceptions.ActiveExecutionsError( + "Can't start execution; Service {0} has an active execution with id {1}" + .format(self.service.name, active_executions[0].id)) + + def _get_workflow_fn(self): + if self._workflow_name in builtin.BUILTIN_WORKFLOWS: + return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, + self._workflow_name)) + + workflow = self.service.workflows[self._workflow_name] + + # TODO: Custom workflow support needs improvement, currently this code uses internal + # knowledge of the resource storage; Instead, workflows should probably be loaded + # in a similar manner to operation plugins. Also consider passing to import_fullname + # as paths instead of appending to sys path. + service_template_resources_path = os.path.join( + self._resource_storage.service_template.base_path, + str(self.service.service_template.id)) + sys.path.append(service_template_resources_path) + + try: + workflow_fn = import_fullname(workflow.implementation) + except ImportError: + raise exceptions.WorkflowImplementationNotFoundError( + 'Could not find workflow {0} implementation at {1}'.format( + self._workflow_name, workflow.implementation)) + + return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 49c584c..82c40c3 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -16,18 +16,16 @@ """ Provides the tasks to be entered into the task graph """ -import copy +from ... import context from ....modeling import models -from ....utils.collections import (OrderedDict, FrozenDict) +from ....modeling import utils as modeling_utils from ....utils.uuid import generate_uuid -from ... import context -from .. import exceptions class BaseTask(object): """ - Abstract task_graph task + Abstract task graph task """ def __init__(self, ctx=None, **kwargs): @@ -56,14 +54,13 @@ class BaseTask(object): class OperationTask(BaseTask): """ - Represents an operation task in the task graph. + Represents an operation task in the task graph """ NAME_FORMAT = '{interface}:{operation}@{type}:{name}' def __init__(self, actor, - actor_type, interface_name, operation_name, inputs=None, @@ -75,122 +72,101 @@ class OperationTask(BaseTask): :meth:`for_relationship`. """ + actor_type = type(actor).__name__.lower() + assert isinstance(actor, (models.Node, models.Relationship)) + assert actor_type in ('node', 'relationship') assert interface_name and operation_name super(OperationTask, self).__init__() - operation = None - interface = actor.interfaces.get(interface_name) - if interface is not None: - operation = interface.operations.get(operation_name) - - if operation is None: - raise exceptions.OperationNotFoundException( - 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' - .format(operation_name, interface_name, actor_type, actor.name)) - - if operation.implementation is None: - raise exceptions.OperationNotFoundException( - 'Empty operation "{0}" on interface "{1}" for {2} "{3}"' - .format(operation_name, interface_name, actor_type, actor.name)) - self.actor = actor - self.actor_type = actor_type - self.interface_name = interface_name - self.operation_name = operation_name - - self.name = OperationTask.NAME_FORMAT.format(type=actor_type, - name=actor.name, - interface=interface_name, - operation=operation_name) self.max_attempts = (self.workflow_context._task_max_attempts if max_attempts is None else max_attempts) self.retry_interval = (self.workflow_context._task_retry_interval if retry_interval is None else retry_interval) self.ignore_failure = (self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure) - self.implementation = operation.implementation - self.plugin = operation.plugin + self.interface_name = interface_name + self.operation_name = operation_name - # Wrap inputs - inputs = copy.deepcopy(inputs) if inputs else {} - for k, v in inputs.iteritems(): - if not isinstance(v, models.Parameter): - inputs[k] = models.Parameter.wrap(k, v) + operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] + self.plugin = operation.plugin + self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs) + self.implementation = operation.implementation + self.name = OperationTask.NAME_FORMAT.format(type=actor_type, + name=actor.name, + interface=self.interface_name, + operation=self.operation_name) - self.inputs = OrderedDict(operation.inputs) - if inputs: - self.inputs.update(inputs) - self.inputs = FrozenDict(self.inputs) + def __repr__(self): + return self.name @classmethod def for_node(cls, node, interface_name, operation_name, - inputs=None, max_attempts=None, retry_interval=None, - ignore_failure=None): + ignore_failure=None, + inputs=None): """ Creates an operation on a node. :param node: The node on which to run the operation :param interface_name: The interface name :param operation_name: The operation name within the interface - :param inputs: Override the operation's inputs :param max_attempts: The maximum number of attempts in case the operation fails - (if not specified the defaults is taken from the workflow context) + (if not specified the defaults it taken from the workflow context) :param retry_interval: The interval in seconds between attempts when the operation fails - (if not specified the defaults is taken from the workflow context) + (if not specified the defaults it taken from the workflow context) :param ignore_failure: Whether to ignore failures - (if not specified the defaults is taken from the workflow context) + (if not specified the defaults it taken from the workflow context) + :param inputs: Additional operation inputs """ assert isinstance(node, models.Node) return cls( actor=node, - actor_type='node', interface_name=interface_name, operation_name=operation_name, - inputs=inputs, max_attempts=max_attempts, retry_interval=retry_interval, - ignore_failure=ignore_failure) + ignore_failure=ignore_failure, + inputs=inputs) @classmethod def for_relationship(cls, relationship, interface_name, operation_name, - inputs=None, max_attempts=None, retry_interval=None, - ignore_failure=None): + ignore_failure=None, + inputs=None): """ - Creates an operation on a relationship. + Creates an operation on a relationship edge. :param relationship: The relationship on which to run the operation :param interface_name: The interface name :param operation_name: The operation name within the interface - :param inputs: Override the operation's inputs :param max_attempts: The maximum number of attempts in case the operation fails - (if not specified the defaults is taken from the workflow context) + (if not specified the defaults it taken from the workflow context) :param retry_interval: The interval in seconds between attempts when the operation fails - (if not specified the defaults is taken from the workflow context) + (if not specified the defaults it taken from the workflow context) :param ignore_failure: Whether to ignore failures - (if not specified the defaults is taken from the workflow context) + (if not specified the defaults it taken from the workflow context) + :param inputs: Additional operation inputs """ assert isinstance(relationship, models.Relationship) return cls( actor=relationship, - actor_type='relationship', interface_name=interface_name, operation_name=operation_name, - inputs=inputs, max_attempts=max_attempts, retry_interval=retry_interval, - ignore_failure=ignore_failure) + ignore_failure=ignore_failure, + inputs=inputs) class WorkflowTask(BaseTask): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/__init__.py b/aria/orchestrator/workflows/builtin/__init__.py index d43a962..8b13c62 100644 --- a/aria/orchestrator/workflows/builtin/__init__.py +++ b/aria/orchestrator/workflows/builtin/__init__.py @@ -24,6 +24,7 @@ from .stop import stop BUILTIN_WORKFLOWS = ('install', 'uninstall', 'start', 'stop') +BUILTIN_WORKFLOWS_PATH_PREFIX = 'aria.orchestrator.workflows.builtin' __all__ = [ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py index 348f47a..16504ec 100644 --- a/aria/orchestrator/workflows/builtin/execute_operation.py +++ b/aria/orchestrator/workflows/builtin/execute_operation.py @@ -17,7 +17,7 @@ Builtin execute_operation workflow """ -from ..api.task import OperationTask +from . import utils from ... import workflow @@ -28,7 +28,6 @@ def execute_operation( interface_name, operation_name, operation_kwargs, - allow_kwargs_override, run_by_dependency_order, type_names, node_template_ids, @@ -41,7 +40,6 @@ def execute_operation( :param TaskGraph graph: the graph which will describe the workflow. :param basestring operation: the operation name to execute :param dict operation_kwargs: - :param bool allow_kwargs_override: :param bool run_by_dependency_order: :param type_names: :param node_template_ids: @@ -71,8 +69,7 @@ def execute_operation( node=node, interface_name=interface_name, operation_name=operation_name, - operation_kwargs=operation_kwargs, - allow_kwargs_override=allow_kwargs_override + operation_kwargs=operation_kwargs ) ) @@ -108,21 +105,16 @@ def _create_node_task( node, interface_name, operation_name, - operation_kwargs, - allow_kwargs_override): + operation_kwargs): """ A workflow which executes a single operation :param node: the node instance to install :param basestring operation: the operation name :param dict operation_kwargs: - :param bool allow_kwargs_override: :return: """ - if allow_kwargs_override is not None: - operation_kwargs['allow_kwargs_override'] = allow_kwargs_override - - return OperationTask.for_node( + return utils.create_node_task( node=node, interface_name=interface_name, operation_name=operation_name, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/utils.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py index 752fe35..2254d13 100644 --- a/aria/orchestrator/workflows/builtin/utils.py +++ b/aria/orchestrator/workflows/builtin/utils.py @@ -12,26 +12,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ..api.task import OperationTask + +from ..api.task import OperationTask, StubTask from .. import exceptions -def create_node_task(node, interface_name, operation_name): +def create_node_task(node, interface_name, operation_name, **kwargs): """ Returns a new operation task if the operation exists in the node, otherwise returns None. """ try: + if _is_empty_task(node, interface_name, operation_name): + return StubTask() + return OperationTask.for_node(node=node, interface_name=interface_name, - operation_name=operation_name) + operation_name=operation_name, + **kwargs) except exceptions.OperationNotFoundException: # We will skip nodes which do not have the operation return None def create_relationships_tasks( - node, interface_name, source_operation_name=None, target_operation_name=None): + node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): """ Creates a relationship task (source and target) for all of a node_instance relationships. :param basestring source_operation_name: the relationship operation name. @@ -43,21 +48,18 @@ def create_relationships_tasks( """ sub_tasks = [] for relationship in node.outbound_relationships: - try: - relationship_operations = relationship_tasks( - relationship, - interface_name, - source_operation_name=source_operation_name, - target_operation_name=target_operation_name) - sub_tasks.append(relationship_operations) - except exceptions.OperationNotFoundException: - # We will skip relationships which do not have the operation - pass + relationship_operations = relationship_tasks( + relationship, + interface_name, + source_operation_name=source_operation_name, + target_operation_name=target_operation_name, + **kwargs) + sub_tasks.append(relationship_operations) return sub_tasks -def relationship_tasks( - relationship, interface_name, source_operation_name=None, target_operation_name=None): +def relationship_tasks(relationship, interface_name, source_operation_name=None, + target_operation_name=None, **kwargs): """ Creates a relationship task source and target. :param Relationship relationship: the relationship instance itself @@ -68,17 +70,33 @@ def relationship_tasks( """ operations = [] if source_operation_name: - operations.append( - OperationTask.for_relationship(relationship=relationship, - interface_name=interface_name, - operation_name=source_operation_name) - ) + try: + if _is_empty_task(relationship, interface_name, source_operation_name): + operations.append(StubTask()) + else: + operations.append( + OperationTask.for_relationship(relationship=relationship, + interface_name=interface_name, + operation_name=source_operation_name, + **kwargs) + ) + except exceptions.OperationNotFoundException: + # We will skip relationships which do not have the operation + pass if target_operation_name: - operations.append( - OperationTask.for_relationship(relationship=relationship, - interface_name=interface_name, - operation_name=target_operation_name) - ) + try: + if _is_empty_task(relationship, interface_name, target_operation_name): + operations.append(StubTask()) + else: + operations.append( + OperationTask.for_relationship(relationship=relationship, + interface_name=interface_name, + operation_name=target_operation_name, + **kwargs) + ) + except exceptions.OperationNotFoundException: + # We will skip relationships which do not have the operation + pass return operations @@ -106,3 +124,15 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): graph.add_dependency(dependency, task) else: graph.add_dependency(task, dependencies) + + +def _is_empty_task(actor, interface_name, operation_name): + interface = actor.interfaces.get(interface_name) + if interface: + operation = interface.operations.get(operation_name) + if operation: + return operation.implementation is None + + raise exceptions.OperationNotFoundException( + 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' + .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index f73cade..155d0ee 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -88,12 +88,12 @@ class Engine(logger.LoggerMixin): def _executable_tasks(self): now = datetime.utcnow() return (task for task in self._tasks_iter() - if task.is_waiting and + if task.is_waiting() and task.due_at <= now and not self._task_has_dependencies(task)) def _ended_tasks(self): - return (task for task in self._tasks_iter() if task.has_ended) + return (task for task in self._tasks_iter() if task.has_ended()) def _task_has_dependencies(self, task): return len(self._execution_graph.pred.get(task.id, {})) > 0 @@ -105,7 +105,7 @@ class Engine(logger.LoggerMixin): for _, data in self._execution_graph.nodes_iter(data=True): task = data['task'] if isinstance(task, engine_task.OperationTask): - if not task.model_task.has_ended: + if not task.model_task.has_ended(): self._workflow_context.model.task.refresh(task.model_task) yield task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index ba93e21..2b26152 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -69,11 +69,9 @@ class StubTask(BaseTask): self.status = models.Task.PENDING self.due_at = datetime.utcnow() - @property def has_ended(self): return self.status in (models.Task.SUCCESS, models.Task.FAILED) - @property def is_waiting(self): return self.status in (models.Task.PENDING, models.Task.RETRYING) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/exceptions.py b/aria/orchestrator/workflows/exceptions.py index 0ca263f..b5ae496 100644 --- a/aria/orchestrator/workflows/exceptions.py +++ b/aria/orchestrator/workflows/exceptions.py @@ -16,6 +16,8 @@ """ Workflow related Exception classes """ +import os + from .. import exceptions @@ -52,10 +54,10 @@ class ProcessException(ExecutorException): Describes the error in detail """ return ( - 'Command "{error.command}" executed with an error.\n' - 'code: {error.return_code}\n' - 'error: {error.stderr}\n' - 'output: {error.stdout}'.format(error=self)) + 'Command "{error.command}" executed with an error.{0}' + 'code: {error.return_code}{0}' + 'error: {error.stderr}{0}' + 'output: {error.stdout}'.format(os.linesep, error=self)) class AriaEngineError(exceptions.AriaError): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py index baa0375..7bd9b7c 100644 --- a/aria/orchestrator/workflows/executor/celery.py +++ b/aria/orchestrator/workflows/executor/celery.py @@ -44,7 +44,7 @@ class CeleryExecutor(BaseExecutor): def execute(self, task): self._tasks[task.id] = task - inputs = dict((k, v.value) for k, v in task.inputs.iteritems()) + inputs = dict(inp.unwrap() for inp in task.inputs.values()) inputs['ctx'] = task.context self._results[task.id] = self._app.send_task( task.operation_mapping, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py new file mode 100644 index 0000000..d894b25 --- /dev/null +++ b/aria/orchestrator/workflows/executor/dry.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Dry executor +""" + +from datetime import datetime + +from .base import BaseExecutor + + +class DryExecutor(BaseExecutor): + """ + Executor which dry runs tasks - prints task information without causing any side effects + """ + + def execute(self, task): + # updating the task manually instead of calling self._task_started(task), + # to avoid any side effects raising that event might cause + with task._update(): + task.started_at = datetime.utcnow() + task.status = task.STARTED + + actor_type = type(task.actor).__name__.lower() + implementation = '{0} > '.format(task.plugin) if task.plugin else '' + implementation += task.implementation + inputs = dict(inp.unwrap() for inp in task.inputs.values()) + + task.context.logger.info( + 'Executing {actor_type} {task.actor.name} operation {task.interface_name} ' + '{task.operation_name}: {implementation} (Inputs: {inputs})' + .format(actor_type=actor_type, task=task, implementation=implementation, inputs=inputs)) + + # updating the task manually instead of calling self._task_succeeded(task), + # to avoid any side effects raising that event might cause + with task._update(): + task.ended_at = datetime.utcnow() + task.status = task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index f814c4d..851d78e 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -148,7 +148,7 @@ class ProcessExecutor(base.BaseExecutor): return { 'task_id': task.id, 'implementation': task.implementation, - 'operation_inputs': dict((k, v.value) for k, v in task.inputs.iteritems()), + 'operation_inputs': dict(inp.unwrap() for inp in task.inputs.values()), 'port': self._server_port, 'context': task.context.serialization_dict, } http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 1a49af5..f422592 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -21,6 +21,7 @@ import Queue import threading from aria.utils import imports + from .base import BaseExecutor @@ -58,7 +59,7 @@ class ThreadExecutor(BaseExecutor): self._task_started(task) try: task_func = imports.load_attribute(task.implementation) - inputs = dict((k, v.value) for k, v in task.inputs.iteritems()) + inputs = dict(inp.unwrap() for inp in task.inputs.values()) task_func(ctx=task.context, **inputs) self._task_succeeded(task) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/parser/consumption/__init__.py ---------------------------------------------------------------------- diff --git a/aria/parser/consumption/__init__.py b/aria/parser/consumption/__init__.py index 7da8490..8f6d2b6 100644 --- a/aria/parser/consumption/__init__.py +++ b/aria/parser/consumption/__init__.py @@ -17,10 +17,21 @@ from .exceptions import ConsumerException from .context import ConsumptionContext from .style import Style -from .consumer import Consumer, ConsumerChain +from .consumer import ( + Consumer, + ConsumerChain +) from .presentation import Read from .validation import Validate -from .modeling import ServiceTemplate, Types, ServiceInstance +from .modeling import ( + ServiceTemplate, + Types, + ServiceInstance, + FindHosts, + ConfigureOperations, + SatisfyRequirements, + ValidateCapabilities +) from .inputs import Inputs __all__ = ( @@ -34,4 +45,7 @@ __all__ = ( 'ServiceTemplate', 'Types', 'ServiceInstance', - 'Inputs') + 'Inputs', + 'SatisfyRequirements', + 'ValidateCapabilities' +) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/parser/consumption/modeling.py ---------------------------------------------------------------------- diff --git a/aria/parser/consumption/modeling.py b/aria/parser/consumption/modeling.py index 6c616b4..771fd7f 100644 --- a/aria/parser/consumption/modeling.py +++ b/aria/parser/consumption/modeling.py @@ -106,7 +106,8 @@ class InstantiateServiceInstance(Consumer): 'template') return - self.context.modeling.template.instantiate(None) + self.context.modeling.template.instantiate(None, None, + inputs=dict(self.context.modeling.inputs)) class CoerceServiceInstanceValues(Consumer): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index 8302fc9..8caca66 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -38,7 +38,7 @@ API: * StorageDriver - class, abstract model implementation. """ -from aria.logger import LoggerMixin +from aria.logger import LoggerMixin, NullHandler from . import sql_mapi __all__ = ( @@ -71,6 +71,10 @@ class Storage(LoggerMixin): :param kwargs: """ super(Storage, self).__init__(**kwargs) + # Set the logger handler of any storage object to NullHandler. + # This is since the absence of a handler shows up while using the CLI in the form of: + # `No handlers could be found for logger "aria.ResourceStorage"`. + self.logger.addHandler(NullHandler()) self.api = api_cls self.registered = {} self._initiator = initiator http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py index f982f63..3f0ecec 100644 --- a/aria/storage/exceptions.py +++ b/aria/storage/exceptions.py @@ -23,3 +23,7 @@ class StorageError(exceptions.AriaError): General storage exception """ pass + + +class NotFoundError(StorageError): + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 138432a..cf2a365 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import copy import json @@ -189,9 +190,9 @@ def apply_tracked_changes(tracked_changes, model): if not value: del successfully_updated_changes[key] model.logger.error( - 'Registering all the changes to the storage has failed. \n' - 'The successful updates were: \n ' - '{0}'.format(json.dumps(successfully_updated_changes, indent=4))) + 'Registering all the changes to the storage has failed. {0}' + 'The successful updates were: {0} ' + '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4))) raise http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/sql_mapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py index 8d34bb4..730d007 100644 --- a/aria/storage/sql_mapi.py +++ b/aria/storage/sql_mapi.py @@ -59,7 +59,7 @@ class SQLAlchemyModelAPI(api.ModelAPI): result = query.first() if not result: - raise exceptions.StorageError( + raise exceptions.NotFoundError( 'Requested `{0}` with ID `{1}` was not found' .format(self.model_cls.__name__, entry_id) ) @@ -69,13 +69,13 @@ class SQLAlchemyModelAPI(api.ModelAPI): assert hasattr(self.model_cls, 'name') result = self.list(include=include, filters={'name': entry_name}) if not result: - raise exceptions.StorageError( - 'Requested {0} with NAME `{1}` was not found' + raise exceptions.NotFoundError( + 'Requested {0} with name `{1}` was not found' .format(self.model_cls.__name__, entry_name) ) elif len(result) > 1: raise exceptions.StorageError( - 'Requested {0} with NAME `{1}` returned more than 1 value' + 'Requested {0} with name `{1}` returned more than 1 value' .format(self.model_cls.__name__, entry_name) ) else: @@ -92,10 +92,8 @@ class SQLAlchemyModelAPI(api.ModelAPI): results, total, size, offset = self._paginate(query, pagination) return ListResult( - items=results, - metadata=dict(total=total, - size=size, - offset=offset) + dict(total=total, size=size, offset=offset), + results ) def iter(self, @@ -406,19 +404,11 @@ def init_storage(base_dir, filename='db.sqlite'): return dict(engine=engine, session=session) -class ListResult(object): +class ListResult(list): """ a ListResult contains results about the requested items. """ - def __init__(self, items, metadata): - self.items = items + def __init__(self, metadata, *args, **qwargs): + super(ListResult, self).__init__(*args, **qwargs) self.metadata = metadata - - def __len__(self): - return len(self.items) - - def __iter__(self): - return iter(self.items) - - def __getitem__(self, item): - return self.items[item] + self.items = self http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/application.py ---------------------------------------------------------------------- diff --git a/aria/utils/application.py b/aria/utils/application.py deleted file mode 100644 index 2f40825..0000000 --- a/aria/utils/application.py +++ /dev/null @@ -1,294 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Convenience storage related tools. -# TODO rename module name -""" - -import json -import os -import shutil -import tarfile -import tempfile -from datetime import datetime - -from aria.storage.exceptions import StorageError -from aria.logger import LoggerMixin - - -class StorageManager(LoggerMixin): - """ - Convenience wrapper to simplify work with the lower level storage mechanism - """ - - def __init__( - self, - model_storage, - resource_storage, - blueprint_path, - blueprint_id, - blueprint_plan, - deployment_id, - deployment_plan, - **kwargs): - super(StorageManager, self).__init__(**kwargs) - self.model_storage = model_storage - self.resource_storage = resource_storage - self.blueprint_path = blueprint_path - self.blueprint_id = blueprint_id - self.blueprint_plan = blueprint_plan - self.deployment_id = deployment_id - self.deployment_plan = deployment_plan - - @classmethod - def from_deployment( - cls, - model_storage, - resource_storage, - deployment_id, - deployment_plan): - """ - Create a StorageManager from a deployment - """ - return cls( - model_storage=model_storage, - resource_storage=resource_storage, - deployment_id=deployment_id, - deployment_plan=deployment_plan, - blueprint_path=None, - blueprint_plan=None, - blueprint_id=None - ) - - @classmethod - def from_blueprint( - cls, - model_storage, - resource_storage, - blueprint_path, - blueprint_id, - blueprint_plan): - """ - Create a StorageManager from a blueprint - """ - return cls( - model_storage=model_storage, - resource_storage=resource_storage, - blueprint_path=blueprint_path, - blueprint_plan=blueprint_plan, - blueprint_id=blueprint_id, - deployment_id=None, - deployment_plan=None) - - def create_blueprint_storage(self, source, main_file_name=None): - """ - create blueprint model & resource - """ - assert self.blueprint_path and self.blueprint_id - assert hasattr(self.resource_storage, 'blueprint') - assert hasattr(self.model_storage, 'blueprint') - - self.logger.debug('creating blueprint resource storage entry') - self.resource_storage.service_template.upload( - entry_id=self.blueprint_id, - source=os.path.dirname(source)) - self.logger.debug('created blueprint resource storage entry') - - self.logger.debug('creating blueprint model storage entry') - now = datetime.utcnow() - blueprint = self.model_storage.service_template.model_cls( - plan=self.blueprint_plan, - id=self.blueprint_id, - description=self.blueprint_plan.get('description'), - created_at=now, - updated_at=now, - main_file_name=main_file_name, - ) - self.model_storage.service_template.put(blueprint) - self.logger.debug('created blueprint model storage entry') - - def create_nodes_storage(self): - """ - create nodes model - """ - assert self.blueprint_path and self.blueprint_id - assert hasattr(self.model_storage, 'node') - assert hasattr(self.model_storage, 'relationship') - - for node in self.blueprint_plan['nodes']: - node_copy = node.copy() - for field in ('name', - 'deployment_plugins_to_install', - 'interfaces', - 'instances'): - node_copy.pop(field) - scalable = node_copy.pop('capabilities')['scalable']['properties'] - for index, relationship in enumerate(node_copy['relationships']): - relationship = self.model_storage.relationship.model_cls(**relationship) - self.model_storage.relationship.put(relationship) - node_copy['relationships'][index] = relationship - - node_copy = self.model_storage.node.model_cls( - blueprint_id=self.blueprint_id, - planned_number_of_instances=scalable['current_instances'], - deploy_number_of_instances=scalable['default_instances'], - min_number_of_instances=scalable['min_instances'], - max_number_of_instances=scalable['max_instances'], - number_of_instances=scalable['current_instances'], - **node_copy) - self.model_storage.node.put(node_copy) - - def create_deployment_storage(self): - """ - create deployment model & resource - """ - assert self.deployment_id and self.deployment_plan - - assert hasattr(self.resource_storage, 'blueprint') - assert hasattr(self.resource_storage, 'deployment') - assert hasattr(self.model_storage, 'deployment') - - self.logger.debug('creating deployment resource storage entry') - temp_dir = tempfile.mkdtemp() - try: - self.resource_storage.service_template.download( - entry_id=self.blueprint_id, - destination=temp_dir) - self.resource_storage.service_instance.upload( - entry_id=self.deployment_id, - source=temp_dir) - finally: - shutil.rmtree(temp_dir, ignore_errors=True) - self.logger.debug('created deployment resource storage entry') - - self.logger.debug('creating deployment model storage entry') - now = datetime.utcnow() - deployment = self.model_storage.service_instance.model_cls( - id=self.deployment_id, - blueprint_id=self.blueprint_id, - description=self.deployment_plan['description'], - workflows=self.deployment_plan['workflows'], - inputs=self.deployment_plan['inputs'], - policy_types=self.deployment_plan['policy_types'], - policy_triggers=self.deployment_plan['policy_triggers'], - groups=self.deployment_plan['groups'], - scaling_groups=self.deployment_plan['scaling_groups'], - outputs=self.deployment_plan['outputs'], - created_at=now, - updated_at=now - ) - self.model_storage.service_instance.put(deployment) - self.logger.debug('created deployment model storage entry') - - def create_node_instances_storage(self): - """ - create node_instances model - """ - assert self.deployment_id and self.deployment_plan - assert hasattr(self.model_storage, 'node_instance') - assert hasattr(self.model_storage, 'relationship_instance') - - self.logger.debug('creating node-instances model storage entries') - for node_instance in self.deployment_plan['node_instances']: - node_model = self.model_storage.node.get(node_instance['node_id']) - relationship_instances = [] - - for index, relationship_instance in enumerate(node_instance['relationships']): - relationship_instance_model = self.model_storage.relationship.model_cls( - relationship=node_model.relationships[index], - target_name=relationship_instance['target_name'], - type=relationship_instance['type'], - target_id=relationship_instance['target_id']) - relationship_instances.append(relationship_instance_model) - self.model_storage.relationship.put(relationship_instance_model) - - node_instance_model = self.model_storage.node.model_cls( - node=node_model, - id=node_instance['id'], - runtime_properties={}, - state=self.model_storage.node.model_cls.UNINITIALIZED, - deployment_id=self.deployment_id, - version='1.0', - relationship_instances=relationship_instances) - - self.model_storage.node.put(node_instance_model) - self.logger.debug('created node-instances model storage entries') - - def create_plugin_storage(self, plugin_id, source): - """ - create plugin model & resource - """ - assert hasattr(self.model_storage, 'plugin') - assert hasattr(self.resource_storage, 'plugin') - - self.logger.debug('creating plugin resource storage entry') - self.resource_storage.plugin.upload(entry_id=plugin_id, source=source) - self.logger.debug('created plugin resource storage entry') - - self.logger.debug('creating plugin model storage entry') - plugin = _load_plugin_from_archive(source) - build_props = plugin.get('build_server_os_properties') - now = datetime.utcnow() - - plugin = self.model_storage.plugin.model_cls( - id=plugin_id, - package_name=plugin.get('package_name'), - package_version=plugin.get('package_version'), - archive_name=plugin.get('archive_name'), - package_source=plugin.get('package_source'), - supported_platform=plugin.get('supported_platform'), - distribution=build_props.get('distribution'), - distribution_version=build_props.get('distribution_version'), - distribution_release=build_props.get('distribution_release'), - wheels=plugin.get('wheels'), - excluded_wheels=plugin.get('excluded_wheels'), - supported_py_versions=plugin.get('supported_python_versions'), - uploaded_at=now - ) - self.model_storage.plugin.put(plugin) - self.logger.debug('created plugin model storage entry') - - -def _load_plugin_from_archive(tar_source): - if not tarfile.is_tarfile(tar_source): - # TODO: go over the exceptions - raise StorageError( - 'the provided tar archive can not be read.') - - with tarfile.open(tar_source) as tar: - tar_members = tar.getmembers() - # a wheel plugin will contain exactly one sub directory - if not tar_members: - raise StorageError( - 'archive file structure malformed. expecting exactly one ' - 'sub directory; got none.') - package_json_path = os.path.join(tar_members[0].name, - 'package.json') - try: - package_member = tar.getmember(package_json_path) - except KeyError: - raise StorageError("'package.json' was not found under {0}" - .format(package_json_path)) - try: - package_json = tar.extractfile(package_member) - except tarfile.ExtractError as e: - raise StorageError(str(e)) - try: - return json.load(package_json) - except ValueError as e: - raise StorageError("'package.json' is not a valid json: " - "{json_str}. error is {error}" - .format(json_str=package_json.read(), error=str(e))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/archive.py ---------------------------------------------------------------------- diff --git a/aria/utils/archive.py b/aria/utils/archive.py new file mode 100644 index 0000000..63d9004 --- /dev/null +++ b/aria/utils/archive.py @@ -0,0 +1,63 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import tarfile +import zipfile +import tempfile +from contextlib import closing + + +def is_archive(source): + return tarfile.is_tarfile(source) or zipfile.is_zipfile(source) + + +def extract_archive(source): + if tarfile.is_tarfile(source): + return untar(source) + elif zipfile.is_zipfile(source): + return unzip(source) + raise ValueError( + 'Unsupported archive type provided or archive is not valid: {0}.'.format(source)) + + +def tar(source, destination): + with closing(tarfile.open(destination, 'w:gz')) as tar_archive: + tar_archive.add(source, arcname=os.path.basename(source)) + + +def untar(archive, destination=None): + if not destination: + destination = tempfile.mkdtemp() + with closing(tarfile.open(name=archive)) as tar_archive: + tar_archive.extractall(path=destination, members=tar_archive.getmembers()) + return destination + + +def zip(source, destination): + with closing(zipfile.ZipFile(destination, 'w')) as zip_file: + for root, _, files in os.walk(source): + for filename in files: + file_path = os.path.join(root, filename) + source_dir = os.path.dirname(source) + zip_file.write( + file_path, os.path.relpath(file_path, source_dir)) + return destination + + +def unzip(archive, destination=None): + if not destination: + destination = tempfile.mkdtemp() + with closing(zipfile.ZipFile(archive, 'r')) as zip_file: + zip_file.extractall(destination) + return destination http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/utils/exceptions.py b/aria/utils/exceptions.py index 9e3e80f..b60cee4 100644 --- a/aria/utils/exceptions.py +++ b/aria/utils/exceptions.py @@ -15,6 +15,7 @@ import sys import linecache +import StringIO import traceback as tb import jsonpickle @@ -89,6 +90,16 @@ def _print_stack(frame): puts(line) +def get_exception_as_string(exc_type, exc_val, traceback): + s_traceback = StringIO.StringIO() + tb.print_exception( + etype=exc_type, + value=exc_val, + tb=traceback, + file=s_traceback) + return s_traceback.getvalue() + + class _WrappedException(Exception): def __init__(self, exception_type, exception_str): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/file.py ---------------------------------------------------------------------- diff --git a/aria/utils/file.py b/aria/utils/file.py index b515f70..6d1aa16 100644 --- a/aria/utils/file.py +++ b/aria/utils/file.py @@ -15,6 +15,7 @@ import errno import os +import shutil def makedirs(path): @@ -26,3 +27,15 @@ def makedirs(path): except IOError as e: if e.errno != errno.EEXIST: raise + +def remove_if_exists(path): + + try: + if os.path.isfile(path): + os.remove(path) + if os.path.isdir(path): + shutil.rmtree(path) + + except OSError as e: + if e.errno != errno.ENOENT: # errno.ENOENT = no such file or directory + raise # re-raise exception if a different error occurred http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/formatting.py ---------------------------------------------------------------------- diff --git a/aria/utils/formatting.py b/aria/utils/formatting.py index 8a223e9..b5e141d 100644 --- a/aria/utils/formatting.py +++ b/aria/utils/formatting.py @@ -83,6 +83,34 @@ def full_type_name(value): return name if module == '__builtin__' else '%s.%s' % (module, name) +def decode_list(data): + decoded_list = [] + for item in data: + if isinstance(item, unicode): + item = item.encode('utf-8') + elif isinstance(item, list): + item = decode_list(item) + elif isinstance(item, dict): + item = decode_dict(item) + decoded_list.append(item) + return decoded_list + + +def decode_dict(data): + decoded_dict = {} + for key, value in data.iteritems(): + if isinstance(key, unicode): + key = key.encode('utf-8') + if isinstance(value, unicode): + value = value.encode('utf-8') + elif isinstance(value, list): + value = decode_list(value) + elif isinstance(value, dict): + value = decode_dict(value) + decoded_dict[key] = value + return decoded_dict + + def safe_str(value): """ Like :code:`str` coercion, but makes sure that Unicode strings are properly
