Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-9-API-for-operation-context 2ac31303f -> f59adc9c9
review fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f59adc9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f59adc9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f59adc9c Branch: refs/heads/ARIA-9-API-for-operation-context Commit: f59adc9c963fbbff7c5c114d87eceb68014b4048 Parents: 2ac3130 Author: mxmrlv <mxm...@gmail.com> Authored: Tue Nov 15 15:57:06 2016 +0200 Committer: mxmrlv <mxm...@gmail.com> Committed: Tue Nov 15 15:57:06 2016 +0200 ---------------------------------------------------------------------- aria/context/common.py | 45 ++---- aria/context/operation.py | 12 +- aria/context/toolbelt.py | 72 ++++------ aria/context/workflow.py | 3 +- aria/decorators.py | 6 +- aria/events/builtin_event_handler.py | 8 +- aria/logger.py | 2 +- aria/workflows/api/task.py | 80 +++++------ aria/workflows/builtin/workflows.py | 54 ++++---- aria/workflows/core/task.py | 27 ++-- aria/workflows/core/translation.py | 4 +- aria/workflows/executor/celery.py | 2 +- tests/context/__init__.py | 7 - tests/context/test_operation.py | 69 +++++----- tests/context/test_toolbelt.py | 137 +++++++++++++------ tests/workflows/api/test_task.py | 28 ++-- tests/workflows/core/test_engine.py | 2 +- tests/workflows/core/test_task.py | 6 +- .../test_task_graph_into_exececution_graph.py | 9 +- tests/workflows/executor/test_executor.py | 2 +- 20 files changed, 294 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/context/common.py ---------------------------------------------------------------------- diff --git a/aria/context/common.py b/aria/context/common.py index 6f7c951..47ec6b2 100644 --- a/aria/context/common.py +++ b/aria/context/common.py @@ -15,6 +15,7 @@ """ A common context for both workflow and operation """ +import os from uuid import uuid4 from .. import logger @@ -34,7 +35,6 @@ class BaseContext(logger.LoggerMixin): deployment_id, workflow_id, execution_id=None, - parameters=None, task_max_attempts=1, task_retry_interval=0, task_ignore_failure=False, @@ -47,7 +47,6 @@ class BaseContext(logger.LoggerMixin): self._deployment_id = deployment_id self._workflow_id = workflow_id self._execution_id = execution_id or str(uuid4()) - self._parameters = parameters or {} self._task_max_attempts = task_max_attempts self._task_retry_interval = task_retry_interval self._task_ignore_failure = task_ignore_failure @@ -107,14 +106,6 @@ class BaseContext(logger.LoggerMixin): self.model.execution.store(value) @property - def parameters(self): - """ - The operation parameters - :return: - """ - return self._parameters - - @property def name(self): """ The operation name @@ -130,34 +121,28 @@ class BaseContext(logger.LoggerMixin): """ return self._id - def download_blueprint_resource(self, destination, path=None): + def download_resource(self, destination, path=None): """ Download a blueprint resource from the resource storage """ - return self.resource.blueprint.download( + return_value = self.resource.deployment.download( entry_id=self.blueprint.id, destination=destination, path=path) - - def download_deployment_resource(self, destination, path=None): - """ - Download a deployment resource from the resource storage - """ - return self.resource.deployment.download( - entry_id=self._deployment_id, - destination=destination, - path=path) + if not path.isdir(destination) or len(os.listdir(destination)) > 0: + return self.resource.blueprint.download(entry_id=self._deployment_id, + destination=destination, + path=path) + else: + return return_value @lru_cache() - def get_deployment_resource_data(self, path=None): + def get_resource(self, path=None): """ Read a deployment resource as string from the resource storage """ - return self.resource.deployment.data(entry_id=self._deployment_id, path=path) - - @lru_cache() - def get_blueprint_resource_data(self, path=None): - """ - Read a blueprint resource as string from the resource storage - """ - return self.resource.blueprint.data(entry_id=self._deployment_id, path=path) + return_value = self.resource.deployment.data(entry_id=self._deployment_id, path=path) + if return_value: + return self.resource.blueprint.data(entry_id=self._deployment_id, path=path) + else: + return return_value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/context/operation.py b/aria/context/operation.py index e0e191d..bf3686d 100644 --- a/aria/context/operation.py +++ b/aria/context/operation.py @@ -34,16 +34,14 @@ class BaseOperationContext(BaseContext): deployment_id=workflow_context._deployment_id, workflow_id=workflow_context._workflow_id, execution_id=workflow_context._execution_id, - parameters=workflow_context.parameters, **kwargs) - self._workflow_context = workflow_context self._task_model = task self._actor = self.task.actor def __repr__(self): - details = ', '.join( - '{0}={1}'.format(key, value) - for key, value in self.task.operation_details.items()) + details = 'operation_mapping={task.operation_mapping}; ' \ + 'operation_inputs={task.inputs}'\ + .format(task=self.task) return '{name}({0})'.format(details, name=self.name) @property @@ -81,7 +79,7 @@ class RelationshipOperationContext(BaseOperationContext): Context for relationship based operations. """ @property - def node(self): + def source_node(self): """ The source node :return: @@ -89,7 +87,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.model.node.get(self.relationship.source_id) @property - def node_instance(self): + def source_node_instance(self): """ The source node instance :return: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/context/toolbelt.py b/aria/context/toolbelt.py index 56d594e..30e9076 100644 --- a/aria/context/toolbelt.py +++ b/aria/context/toolbelt.py @@ -13,47 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -provides with different tools when working with the operation. +Provides with different tools for operations. """ -from contextlib import contextmanager - -from .. import exceptions from . import operation -class _BaseToolbelt(object): +class NodeToolBelt(object): """ - Base tool belt + Node operation related tool belt """ - def __init__(self): - self._op_context = None - self._workflow_context = None - - @contextmanager - def use(self, operation_context): - """ - Context manager which switches the current toolbelt with the supplied ctx - :param operation_context: - :return: - """ - assert isinstance(operation_context, operation.BaseOperationContext) - _op_context = self._op_context - _workflow_context = self._workflow_context - + def __init__(self, operation_context): self._op_context = operation_context - self._workflow_context = operation_context._workflow_context - try: - yield self - finally: - self._op_context = _op_context - self._workflow_context = _workflow_context - -class _NodeToolBelt(_BaseToolbelt): - """ - Node operation related tool belt - """ @property def relationships_to_me(self): """ @@ -61,12 +33,30 @@ class _NodeToolBelt(_BaseToolbelt): :return: """ assert isinstance(self._op_context, operation.NodeOperationContext) - for node_instance in self._workflow_context.node_instances: + node_instances = self._op_context.model.node_instance.iter( + filters={'deployment_id': self._op_context.deployment.id} + ) + for node_instance in node_instances: for relationship_instance in node_instance.relationship_instances: if relationship_instance.target_id == self._op_context.node_instance.id: yield relationship_instance @property + def dependent_node_instances(self): + """ + Any node instance which has a relationship to the current node instance. + :return: + """ + assert isinstance(self._op_context, operation.NodeOperationContext) + node_instances = self._op_context.model.node_instance.iter( + filters={'deployment_id': self._op_context.deployment.id} + ) + for node_instance in node_instances: + for relationship_instance in node_instance.relationship_instances: + if relationship_instance.target_id == self._op_context.node_instance.id: + yield node_instance + + @property def host_ip(self): """ The host ip of the current node @@ -74,18 +64,16 @@ class _NodeToolBelt(_BaseToolbelt): """ assert isinstance(self._op_context, operation.NodeOperationContext) host_id = self._op_context._actor.host_id - host_instance = self._workflow_context.model.node_instance.get(host_id) + host_instance = self._op_context.model.node_instance.get(host_id) return host_instance.runtime_properties.get('ip') -class _RelationshipToolBelt(_BaseToolbelt): +class RelationshipToolBelt(object): """ Relationship operation related tool belt """ - pass - -_operation_toolbelt = _NodeToolBelt() -_relationship_toolbelt = _RelationshipToolBelt() + def __init__(self, operation_context): + self._op_context = operation_context def toolbelt(operation_context): @@ -95,8 +83,8 @@ def toolbelt(operation_context): :return: """ if isinstance(operation_context, operation.NodeOperationContext): - return _operation_toolbelt.use(operation_context) + return NodeToolBelt(operation_context) elif isinstance(operation_context, operation.RelationshipOperationContext): - return _relationship_toolbelt.use(operation_context) + return RelationshipToolBelt(operation_context) else: - raise exceptions.TaskException("Operation context not supported") + raise RuntimeError("Operation context not supported") http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index 97bd7f9..0495bdc 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -36,8 +36,9 @@ class WorkflowContext(BaseContext): """ Context object used during workflow creation and execution """ - def __init__(self, *args, **kwargs): + def __init__(self, parameters=None, *args, **kwargs): super(WorkflowContext, self).__init__(*args, **kwargs) + self.parameters = parameters or {} # TODO: execution creation should happen somewhere else # should be moved there, when such logical place exists try: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/decorators.py ---------------------------------------------------------------------- diff --git a/aria/decorators.py b/aria/decorators.py index 8a400fb..308f70c 100644 --- a/aria/decorators.py +++ b/aria/decorators.py @@ -60,10 +60,8 @@ def operation(func=None, toolbelt=False, suffix_template=''): @wraps(func) def _wrapper(**func_kwargs): if toolbelt: - with context.toolbelt(func_kwargs.get('ctx')) as operation_toolbelt: - func_kwargs.setdefault('toolbelt', operation_toolbelt) - validate_function_arguments(func, func_kwargs) - return func(**func_kwargs) + operation_toolbelt = context.toolbelt(func_kwargs.get('ctx')) + func_kwargs.setdefault('toolbelt', operation_toolbelt) validate_function_arguments(func, func_kwargs) return func(**func_kwargs) return _wrapper http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index 2f9a3be..c5cccfe 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -41,20 +41,20 @@ from . import ( @sent_task_signal.connect def _task_sent(task, *args, **kwargs): - with task.update(): + with task._update(): task.status = task.SENT @start_task_signal.connect def _task_started(task, *args, **kwargs): - with task.update(): + with task._update(): task.started_at = datetime.utcnow() task.status = task.STARTED @on_failure_task_signal.connect def _task_failed(task, *args, **kwargs): - with task.update(): + with task._update(): should_retry = ( (task.retry_count < task.max_attempts - 1 or task.max_attempts == task.INFINITE_RETRIES) and @@ -72,7 +72,7 @@ def _task_failed(task, *args, **kwargs): @on_success_task_signal.connect def _task_succeeded(task, *args, **kwargs): - with task.update(): + with task._update(): task.ended_at = datetime.utcnow() task.status = task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 0fbf6cc..0002cb5 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -39,7 +39,7 @@ class LoggerMixin(object): self.logger_name = self.logger_name or self.__class__.__name__ self.logger = logging.getLogger('{0}.{1}'.format(_base_logger.name, self.logger_name)) self.logger.setLevel(self.logger_level) - super(LoggerMixin, self).__init__() + super(LoggerMixin, self).__init__(*args, **kwargs) @classmethod def with_logger( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py index 8374f4e..d6710ad 100644 --- a/aria/workflows/api/task.py +++ b/aria/workflows/api/task.py @@ -52,11 +52,14 @@ class BaseTask(object): return self._workflow_context -class _OperationTask(BaseTask): +class OperationTask(BaseTask): """ Represents an operation task in the task_graph """ + SOURCE_OPERATION = 'source_operations' + TARGET_OPERATION = 'target_operations' + def __init__(self, name, actor, @@ -74,7 +77,7 @@ class _OperationTask(BaseTask): """ assert isinstance(actor, (storage.models.NodeInstance, storage.models.RelationshipInstance)) - super(_OperationTask, self).__init__() + super(OperationTask, self).__init__() self.actor = actor self.name = '{name}.{actor.id}'.format(name=name, actor=actor) self.operation_mapping = operation_mapping @@ -86,64 +89,45 @@ class _OperationTask(BaseTask): self.ignore_failure = (self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure) - -class NodeOperationTask(_OperationTask): - """ - Represents a node based operation - """ - def __init__(self, - actor, - name, - *args, - **kwargs): + @classmethod + def node_instance(cls, actor, name, inputs=None, *args, **kwargs): """ - Represents a relationship based operation + Represents a node based operation :param actor: the node of which this operation belongs to. :param name: the name of the operation. """ assert isinstance(actor, storage.models.NodeInstance) - assert 'operation_details' not in kwargs - operation_mapping = actor.node.operations[name].get('operation', '') - operation_inputs = actor.node.operations[name].get('inputs', {}) - operation_inputs.update(kwargs.pop('inputs', {}) or {}) - super(NodeOperationTask, self).__init__(name=name, - actor=actor, - operation_mapping=operation_mapping, - inputs=operation_inputs, - *args, - **kwargs) - - -class RelationshipOperationTask(_OperationTask): - """ - Represents a relationship based operation - """ - def __init__(self, - actor, - name, - target, - *args, - **kwargs): + operation_details = actor.node.operations[name] + operation_inputs = operation_details.get('inputs', {}) + operation_inputs.update(inputs or {}) + return cls(name=name, + actor=actor, + operation_mapping=operation_details.get('operation', ''), + inputs=operation_inputs, + *args, + **kwargs) + + @classmethod + def relationship_instance(cls, name, actor, operation_end, inputs=None, *args, **kwargs): """ Represents a relationship based operation - :param actor: the relationship of which this operation belongs to. + :param actor: the node of which this operation belongs to. :param name: the name of the operation. - :param target: whether this is a source or target operation. + :param operation_end: source or target end of the relationship + :param inputs any additional inputs to the operation """ assert isinstance(actor, storage.models.RelationshipInstance) - assert 'operation_details' not in kwargs - side = 'target_operations' if target else 'source_operations' - operation_mapping = getattr(actor.relationship, side)[name].get('operation') - operation_inputs = getattr(actor.relationship, side)[name].get('inputs', {}) - operation_inputs.update(kwargs.pop('inputs', {})) - super(RelationshipOperationTask, self).__init__(actor=actor, - name=name, - operation_mapping=operation_mapping, - inputs=operation_inputs, - *args, - **kwargs) + operation_details = getattr(actor.relationship, operation_end)[name] + operation_inputs = operation_details.get('inputs', {}) + operation_inputs.update(inputs or {}) + return cls(actor=actor, + name=name, + operation_mapping=operation_details.get('operation'), + inputs=operation_inputs, + *args, + **kwargs) class WorkflowTask(BaseTask): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/workflows.py b/aria/workflows/builtin/workflows.py index ee75dbb..b9fe6eb 100644 --- a/aria/workflows/builtin/workflows.py +++ b/aria/workflows/builtin/workflows.py @@ -42,13 +42,16 @@ def install_node_instance(graph, node_instance, **kwargs): :param node_instance: the node instance to install :return: """ - create_node_instance = task.NodeOperationTask(actor=node_instance, - name='aria.interfaces.lifecycle.create') + create_node_instance = task.OperationTask.node_instance( + actor=node_instance, + name='aria.interfaces.lifecycle.create') - configure_node_instance = task.NodeOperationTask(actor=node_instance, - name='aria.interfaces.lifecycle.configure') - start_node_instance = task.NodeOperationTask(actor=node_instance, - name='aria.interfaces.lifecycle.start') + configure_node_instance = task.OperationTask.node_instance( + actor=node_instance, + name='aria.interfaces.lifecycle.configure') + start_node_instance = task.OperationTask.node_instance( + actor=node_instance, + name='aria.interfaces.lifecycle.start') graph.sequence( create_node_instance, @@ -109,16 +112,18 @@ def establish_relationship(graph, node_instance, **kwargs): @workflow(suffix_template='{node_instance.id}') def uninstall_node_instance(graph, node_instance, **kwargs): """ - A workflow which uninstalls a node instance. - :param WorkflowContext context: the workflow context - :param TaskGraph graph: the tasks graph of which to edit - :param node_instance: the node instance to uninstall - :return: - """ - stop_node_instance = task.NodeOperationTask(actor=node_instance, - name='aria.interfaces.lifecycle.stop') - delete_node_instance = task.NodeOperationTask(actor=node_instance, - name='aria.interfaces.lifecycle.delete') + A workflow which uninstalls a node instance. + :param WorkflowContext context: the workflow context + :param TaskGraph graph: the tasks graph of which to edit + :param node_instance: the node instance to uninstall + :return: + """ + stop_node_instance = task.OperationTask.node_instance( + actor=node_instance, + name='aria.interfaces.lifecycle.stop') + delete_node_instance = task.OperationTask.node_instance( + actor=node_instance, + name='aria.interfaces.lifecycle.delete') graph.sequence( stop_node_instance, @@ -159,7 +164,7 @@ def execute_operation_on_instance( if allow_kwargs_override is not None: operation_kwargs['allow_kwargs_override'] = allow_kwargs_override - return task.NodeOperationTask( + return task.OperationTask.node_instance( actor=node_instance, name=operation, inputs=operation_kwargs) @@ -198,10 +203,13 @@ def relationship_tasks(relationship_instance, operation_name): :param index: the relationship index - enables pretty print :return: """ - source_operation = task.RelationshipOperationTask(actor=relationship_instance, - name=operation_name, - target=False) - target_operation = task.RelationshipOperationTask(actor=relationship_instance, - name=operation_name, - target=True) + source_operation = task.OperationTask.relationship_instance( + actor=relationship_instance, + name=operation_name, + operation_end=task.OperationTask.SOURCE_OPERATION) + target_operation = task.OperationTask.relationship_instance( + actor=relationship_instance, + name=operation_name, + operation_end=task.OperationTask.TARGET_OPERATION) + return source_operation, target_operation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py index 9275caa..9ab5697 100644 --- a/aria/workflows/core/task.py +++ b/aria/workflows/core/task.py @@ -124,7 +124,8 @@ class OperationTask(BaseTask): elif isinstance(api_task.actor, models.RelationshipInstance): context_class = operation_context.RelationshipOperationContext else: - context_class = None + raise RuntimeError('No operation context could be created for {0}' + .format(api_task.actor.model_cls)) self._ctx = context_class(name=api_task.name, workflow_context=self._workflow_context, @@ -134,7 +135,7 @@ class OperationTask(BaseTask): self._update_fields = None @contextmanager - def update(self): + def _update(self): """ A context manager which puts the task into update mode, enabling fields update. :yields: None @@ -142,23 +143,23 @@ class OperationTask(BaseTask): self._update_fields = {} try: yield - task = self.model_context + task = self.model_task for key, value in self._update_fields.items(): setattr(task, key, value) - self.model_context = task + self.model_task = task finally: self._update_fields = None @property - def model_context(self): + def model_task(self): """ Returns the task model in storage :return: task in storage """ return self._workflow_context.model.task.get(self._task_id) - @model_context.setter - def model_context(self, value): + @model_task.setter + def model_task(self, value): self._workflow_context.model.task.store(value) @property @@ -175,7 +176,7 @@ class OperationTask(BaseTask): Returns the task status :return: task status """ - return self.model_context.status + return self.model_task.status @status.setter @_locked @@ -188,7 +189,7 @@ class OperationTask(BaseTask): Returns when the task started :return: when task started """ - return self.model_context.started_at + return self.model_task.started_at @started_at.setter @_locked @@ -201,7 +202,7 @@ class OperationTask(BaseTask): Returns when the task ended :return: when task ended """ - return self.model_context.ended_at + return self.model_task.ended_at @ended_at.setter @_locked @@ -214,7 +215,7 @@ class OperationTask(BaseTask): Returns the retry count for the task :return: retry count """ - return self.model_context.retry_count + return self.model_task.retry_count @retry_count.setter @_locked @@ -227,7 +228,7 @@ class OperationTask(BaseTask): Returns the minimum datetime in which the task can be executed :return: eta """ - return self.model_context.due_at + return self.model_task.due_at @due_at.setter @_locked @@ -236,6 +237,6 @@ class OperationTask(BaseTask): def __getattr__(self, attr): try: - return getattr(self.model_context, attr) + return getattr(self.model_task, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py index 8e55e14..b6cbdad 100644 --- a/aria/workflows/core/translation.py +++ b/aria/workflows/core/translation.py @@ -47,7 +47,7 @@ def build_execution_graph( dependencies, default=[start_task]) - if isinstance(api_task, api.task._OperationTask): + if isinstance(api_task, api.task.OperationTask): # Add the task an the dependencies operation_task = core_task.OperationTask(api_task) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) @@ -86,7 +86,7 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): Returns task list from dependencies. """ return [execution_graph.node[dependency.id - if isinstance(dependency, (api.task._OperationTask, + if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)) else _end_graph_suffix(dependency.id)]['task'] for dependency in dependencies] or default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/aria/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/celery.py b/aria/workflows/executor/celery.py index 313d389..b1ba9be 100644 --- a/aria/workflows/executor/celery.py +++ b/aria/workflows/executor/celery.py @@ -46,7 +46,7 @@ class CeleryExecutor(BaseExecutor): self._tasks[task.id] = task self._results[task.id] = self._app.send_task( task.operation_mapping, - ctx=task.model_context, + ctx=task.context, kwargs=task.inputs, task_id=task.id, queue=self._get_queue(task)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/tests/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/context/__init__.py b/tests/context/__init__.py index 99cda92..afd5cd7 100644 --- a/tests/context/__init__.py +++ b/tests/context/__init__.py @@ -19,8 +19,6 @@ import pytest from aria.workflows.core import engine -global_test_holder = {} - def op_path(func, module_path=None): module_path = module_path or sys.modules[__name__].__name__ @@ -35,8 +33,3 @@ def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) eng.execute() - - -@pytest.fixture(autouse=True) -def cleanup(): - global_test_holder.clear() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/tests/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/context/test_operation.py b/tests/context/test_operation.py index 787d1f5..032367c 100644 --- a/tests/context/test_operation.py +++ b/tests/context/test_operation.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys - import pytest from aria import ( @@ -30,9 +28,10 @@ from . import ( op_path, op_name, execute, - global_test_holder, ) +global_test_holder = {} + @pytest.fixture def ctx(): @@ -53,20 +52,19 @@ def test_node_operation_task_execution(ctx, executor): node = mock.models.get_dependency_node() node.operations[operation_name] = { - 'operation': op_path(my_operation, module_path=sys.modules[__name__].__name__) + 'operation': op_path(my_operation, module_path=__name__) } node_instance = mock.models.get_dependency_node_instance(node) ctx.model.node.store(node) ctx.model.node_instance.store(node_instance) - node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) inputs = {'putput': True} @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.NodeOperationTask( + api.task.OperationTask.node_instance( name=operation_name, actor=node_instance, inputs=inputs @@ -75,23 +73,19 @@ def test_node_operation_task_execution(ctx, executor): execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) - operation_value = global_test_holder[op_name(node_instance, operation_name)] - - assert isinstance(operation_value, context.operation.NodeOperationContext) + operation_context = global_test_holder[op_name(node_instance, operation_name)] - # operation container based attributes - for key, value in node_instance.fields_dict.items(): - assert getattr(operation_value._actor, key) == value + assert isinstance(operation_context, context.operation.NodeOperationContext) # Task bases assertions - assert operation_value.task.actor == node_instance - assert operation_value.task.name == op_name(node_instance, operation_name) - assert operation_value.task.operation_mapping == node.operations[operation_name]['operation'] - assert operation_value.task.inputs == inputs + assert operation_context.task.actor == node_instance + assert operation_context.task.name == op_name(node_instance, operation_name) + assert operation_context.task.operation_mapping == node.operations[operation_name]['operation'] + assert operation_context.task.inputs == inputs # Context based attributes (sugaring) - assert operation_value.node == node_instance.node - assert operation_value.node_instance == node_instance + assert operation_context.node == node_instance.node + assert operation_context.node_instance == node_instance def test_relationship_operation_task_execution(ctx, executor): @@ -101,7 +95,7 @@ def test_relationship_operation_task_execution(ctx, executor): dependency_node_instance = mock.models.get_dependency_node_instance() relationship = mock.models.get_relationship(target=dependency_node) relationship.source_operations[operation_name] = { - 'operation': op_path(my_operation, module_path=sys.modules[__name__].__name__) + 'operation': op_path(my_operation, module_path=__name__) } relationship_instance = mock.models.get_relationship_instance( target_instance=dependency_node_instance, @@ -122,40 +116,41 @@ def test_relationship_operation_task_execution(ctx, executor): @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.RelationshipOperationTask( + api.task.OperationTask.relationship_instance( actor=relationship_instance, name=operation_name, - target=False, + operation_end=api.task.OperationTask.SOURCE_OPERATION, inputs=inputs ) ) execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) - operation_value = global_test_holder[op_name(relationship_instance, operation_name)] + operation_context = global_test_holder[op_name(relationship_instance, operation_name)] - assert isinstance(operation_value, context.operation.RelationshipOperationContext) - - # operation container based attributes - for key, value in relationship_instance.fields_dict.items(): - assert getattr(operation_value._actor, key) == value + assert isinstance(operation_context, context.operation.RelationshipOperationContext) # Task bases assertions - assert operation_value.task.actor == relationship_instance - assert operation_value.task.name == op_name(relationship_instance, operation_name) - assert operation_value.task.operation_mapping == \ + assert operation_context.task.actor == relationship_instance + assert operation_context.task.name == op_name(relationship_instance, operation_name) + assert operation_context.task.operation_mapping == \ relationship.source_operations[operation_name]['operation'] - assert operation_value.task.inputs == inputs + assert operation_context.task.inputs == inputs # Context based attributes (sugaring) - assert operation_value.target_node == dependency_node - assert operation_value.target_node_instance == dependency_node_instance - assert operation_value.relationship == relationship - assert operation_value.relationship_instance == relationship_instance - assert operation_value.node == dependent_node - assert operation_value.node_instance == dependent_node_instance + assert operation_context.target_node == dependency_node + assert operation_context.target_node_instance == dependency_node_instance + assert operation_context.relationship == relationship + assert operation_context.relationship_instance == relationship_instance + assert operation_context.source_node == dependent_node + assert operation_context.source_node_instance == dependent_node_instance @operation def my_operation(ctx, **_): global_test_holder[ctx.name] = ctx + + +@pytest.fixture(autouse=True) +def cleanup(): + global_test_holder.clear() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/tests/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/context/test_toolbelt.py b/tests/context/test_toolbelt.py index 0208738..2e4ef5a 100644 --- a/tests/context/test_toolbelt.py +++ b/tests/context/test_toolbelt.py @@ -13,23 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys - import pytest -from aria import workflow, operation, context, exceptions +from aria import workflow, operation, context from aria.workflows import api from aria.workflows.executor import thread -from aria.context.toolbelt import _RelationshipToolBelt, _NodeToolBelt +from aria.context.toolbelt import RelationshipToolBelt, NodeToolBelt from .. import mock from . import ( op_path, op_name, execute, - global_test_holder, ) +global_test_holder = {} + @pytest.fixture def workflow_context(): @@ -45,13 +44,8 @@ def executor(): result.close() -def test_operation_tool_belt(workflow_context, executor): - operation_name = 'aria.interfaces.lifecycle.create' +def _create_blueprint_in_storate(workflow_context): dependency_node = mock.models.get_dependency_node() - dependency_node.operations[operation_name] = { - 'operation': op_path(node_operation, module_path=sys.modules[__name__].__name__) - - } dependency_node_instance = mock.models.get_dependency_node_instance( dependency_node=dependency_node) relationship = mock.models.get_relationship(target=dependency_node) @@ -66,13 +60,26 @@ def test_operation_tool_belt(workflow_context, executor): workflow_context.model.relationship_instance.store(relationship_instance) workflow_context.model.node.store(dependent_node) workflow_context.model.node_instance.store(dependent_node_instance) + return dependency_node, dependency_node_instance, \ + dependent_node, dependent_node_instance, \ + relationship, relationship_instance + + +def test_relationship_to_me(workflow_context, executor): + operation_name = 'aria.interfaces.lifecycle.create' + dependency_node, dependency_node_instance, _, _, _, relationship_instance = \ + _create_blueprint_in_storate(workflow_context) + dependency_node.operations[operation_name] = { + 'operation': op_path(relationships_to_me, module_path=__name__) + } + workflow_context.model.node.store(dependency_node) inputs = {'putput': True} @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.NodeOperationTask( + api.task.OperationTask.node_instance( actor=dependency_node_instance, name=operation_name, inputs=inputs @@ -81,45 +88,82 @@ def test_operation_tool_belt(workflow_context, executor): execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) - assert isinstance(global_test_holder.get(op_name(dependency_node_instance, operation_name)), - _NodeToolBelt) assert list(global_test_holder.get('relationships_to_me', [])) == list([relationship_instance]) - assert global_test_holder.get('ip') == dependency_node_instance.runtime_properties.get('ip') + + +def test_host_ip(workflow_context, executor): + operation_name = 'aria.interfaces.lifecycle.create' + dependency_node, dependency_node_instance, _, _, _, relationship_instance = \ + _create_blueprint_in_storate(workflow_context) + dependency_node.operations[operation_name] = { + 'operation': op_path(host_ip, module_path=__name__) + + } + workflow_context.model.node.store(dependency_node) + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.node_instance( + actor=dependency_node_instance, + name=operation_name, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + assert global_test_holder.get('host_ip') == \ + dependency_node_instance.runtime_properties.get('ip') + + +def test_dependent_node_instances(workflow_context, executor): + operation_name = 'aria.interfaces.lifecycle.create' + dependency_node, dependency_node_instance, \ + _, dependent_node_instance, \ + _, relationship_instance = \ + _create_blueprint_in_storate(workflow_context) + dependency_node.operations[operation_name] = { + 'operation': op_path(dependent_nodes, module_path=__name__) + + } + workflow_context.model.node.store(dependency_node) + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.node_instance( + actor=dependency_node_instance, + name=operation_name, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + assert list(global_test_holder.get('dependent_node_instances', [])) == \ + list([dependent_node_instance]) def test_relationship_tool_belt(workflow_context, executor): operation_name = 'aria.interfaces.relationship_lifecycle.postconfigure' - - dependency_node = mock.models.get_dependency_node() - dependency_node_instance = \ - mock.models.get_dependency_node_instance(dependency_node=dependency_node) - relationship = mock.models.get_relationship(target=dependency_node) + _, _, _, _, relationship, relationship_instance = _create_blueprint_in_storate(workflow_context) relationship.source_operations[operation_name] = { - 'operation': op_path(relationship_operation, module_path=sys.modules[__name__].__name__) + 'operation': op_path(relationship_operation, module_path=__name__) } - relationship_instance = \ - mock.models.get_relationship_instance(target_instance=dependency_node_instance, - relationship=relationship) - dependent_node = mock.models.get_dependent_node(relationship=relationship) - dependent_node_instance = \ - mock.models.get_dependent_node_instance(relationship_instance=relationship_instance, - dependent_node=dependent_node) - workflow_context.model.node.store(dependency_node) - workflow_context.model.node_instance.store(dependency_node_instance) workflow_context.model.relationship.store(relationship) - workflow_context.model.relationship_instance.store(relationship_instance) - workflow_context.model.node.store(dependent_node) - workflow_context.model.node_instance.store(dependent_node_instance) inputs = {'putput': True} @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.RelationshipOperationTask( + api.task.OperationTask.relationship_instance( actor=relationship_instance, name=operation_name, - target=False, + operation_end=api.task.OperationTask.SOURCE_OPERATION, inputs=inputs ) ) @@ -127,21 +171,34 @@ def test_relationship_tool_belt(workflow_context, executor): execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) assert isinstance(global_test_holder.get(op_name(relationship_instance, operation_name)), - _RelationshipToolBelt) + RelationshipToolBelt) def test_wrong_model_toolbelt(): - with pytest.raises(exceptions.TaskException): + with pytest.raises(RuntimeError): context.toolbelt(None) @operation(toolbelt=True) -def node_operation(ctx, toolbelt, **_): +def relationships_to_me(toolbelt, **_): global_test_holder['relationships_to_me'] = list(toolbelt.relationships_to_me) - global_test_holder['ip'] = toolbelt.host_ip - global_test_holder[ctx.name] = toolbelt + + +@operation(toolbelt=True) +def host_ip(toolbelt, **_): + global_test_holder['host_ip'] = toolbelt.host_ip + + +@operation(toolbelt=True) +def dependent_nodes(toolbelt, **_): + global_test_holder['dependent_node_instances'] = list(toolbelt.dependent_node_instances) @operation(toolbelt=True) def relationship_operation(ctx, toolbelt, **_): global_test_holder[ctx.name] = toolbelt + + +@pytest.fixture(autouse=True) +def cleanup(): + global_test_holder.clear() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/tests/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py index bd7ae27..257f0d4 100644 --- a/tests/workflows/api/test_task.py +++ b/tests/workflows/api/test_task.py @@ -74,12 +74,13 @@ class TestOperationTask(object): ignore_failure = True with context.workflow.current.push(workflow_context): - api_task = api.task.NodeOperationTask(name=operation_name, - actor=node_instance, - inputs=inputs, - max_attempts=max_attempts, - retry_interval=retry_interval, - ignore_failure=ignore_failure) + api_task = api.task.OperationTask.node_instance( + name=operation_name, + actor=node_instance, + inputs=inputs, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure) assert api_task.name == '{0}.{1}'.format(operation_name, node_instance.id) assert api_task.operation_mapping is True @@ -102,12 +103,13 @@ class TestOperationTask(object): retry_interval = 10 with context.workflow.current.push(workflow_context): - api_task = api.task.RelationshipOperationTask(name=operation_name, - actor=relationship_instance, - target=False, - inputs=inputs, - max_attempts=max_attempts, - retry_interval=retry_interval) + api_task = api.task.OperationTask.relationship_instance( + name=operation_name, + actor=relationship_instance, + operation_end=api.task.OperationTask.SOURCE_OPERATION, + inputs=inputs, + max_attempts=max_attempts, + retry_interval=retry_interval) assert api_task.name == '{0}.{1}'.format(operation_name, relationship_instance.id) assert api_task.operation_mapping is True @@ -119,7 +121,7 @@ class TestOperationTask(object): def test_operation_task_default_values(self): workflow_context = mock.context.simple(task_ignore_failure=True) with context.workflow.current.push(workflow_context): - model_task = api.task._OperationTask( + model_task = api.task.OperationTask( name='stub', operation_mapping='', actor=mock.models.get_dependency_node_instance()) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/tests/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py index cad3312..eb67006 100644 --- a/tests/workflows/core/test_engine.py +++ b/tests/workflows/core/test_engine.py @@ -70,7 +70,7 @@ class BaseTest(object): 'operation': 'tests.workflows.core.test_engine.{name}'.format(name=func.__name__) } ctx.model.node_instance.store(node_instance) - return api.task.NodeOperationTask( + return api.task.OperationTask.node_instance( actor=node_instance, name='aria.interfaces.lifecycle.create', inputs=inputs, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/tests/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task.py b/tests/workflows/core/test_task.py index d0b329a..ce4be0a 100644 --- a/tests/workflows/core/test_task.py +++ b/tests/workflows/core/test_task.py @@ -52,7 +52,7 @@ class TestOperationTask(object): def _create_operation_task(self, ctx, node_instance): with workflow_context.current.push(ctx): - api_task = api.task.NodeOperationTask( + api_task = api.task.OperationTask.node_instance( actor=node_instance, name='aria.interfaces.lifecycle.create', ) @@ -66,7 +66,7 @@ class TestOperationTask(object): api_task, core_task = self._create_operation_task(ctx, node_instance) storage_task = ctx.model.task.get(core_task.id) - assert core_task.model_context == storage_task + assert core_task.model_task == storage_task assert core_task.name == api_task.name assert core_task.operation_mapping == api_task.operation_mapping assert core_task.actor == api_task.actor == node_instance @@ -94,7 +94,7 @@ class TestOperationTask(object): _, core_task = self._create_operation_task(ctx, node_instance) future_time = datetime.utcnow() + timedelta(seconds=3) - with core_task.update(): + with core_task._update(): core_task.status = core_task.STARTED core_task.started_at = future_time core_task.ended_at = future_time http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/tests/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py index e48fc9a..b10ed28 100644 --- a/tests/workflows/core/test_task_graph_into_exececution_graph.py +++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py @@ -38,11 +38,14 @@ def test_task_graph_into_execution_graph(): with context.workflow.current.push(task_context): test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') - simple_before_task = api.task.NodeOperationTask(node_instance, operation_name) - simple_after_task = api.task.NodeOperationTask(node_instance, operation_name) + simple_before_task = api.task.OperationTask.node_instance(actor=node_instance, + name=operation_name) + simple_after_task = api.task.OperationTask.node_instance(actor=node_instance, + name=operation_name) inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') - inner_task = api.task.NodeOperationTask(node_instance, operation_name) + inner_task = api.task.OperationTask.node_instance(actor=node_instance, + name=operation_name) inner_task_graph.add_tasks(inner_task) test_task_graph.add_tasks(simple_before_task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f59adc9c/tests/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/executor/test_executor.py b/tests/workflows/executor/test_executor.py index 6c7551c..df2728c 100644 --- a/tests/workflows/executor/test_executor.py +++ b/tests/workflows/executor/test_executor.py @@ -124,7 +124,7 @@ class MockTask(object): setattr(self, state.upper(), state) @contextmanager - def update(self): + def _update(self): yield self