Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-9-API-for-operation-context 5f5b858ec -> 5956f8899 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py index 50bca45..58f9760 100644 --- a/tests/workflows/api/test_task.py +++ b/tests/workflows/api/test_task.py @@ -60,46 +60,76 @@ def ctx(): class TestOperationTask(object): - def test_operation_task_creation(self): + def test_node_operation_task_creation(self): workflow_context = mock.context.simple() - name = 'task_name' - op_details = {'operation_details': True} - node_instance = mock.models.get_dependency_node_instance() + operation_name = 'aria.interfaces.lifecycle.create' + op_details = {'operation': True} + node = mock.models.get_dependency_node() + node.operations[operation_name] = op_details + node_instance = mock.models.get_dependency_node_instance(dependency_node=node) inputs = {'inputs': True} max_attempts = 10 retry_interval = 10 ignore_failure = True with context.workflow.current.push(workflow_context): - model_task = api.task.OperationTask(name=name, - operation_details=op_details, - node_instance=node_instance, - inputs=inputs, - max_attempts=max_attempts, - retry_interval=retry_interval, - ignore_failure=ignore_failure) - - assert model_task.name == name - assert model_task.operation_details == op_details - assert model_task.node_instance == node_instance - assert model_task.inputs == inputs - assert model_task.retry_interval == retry_interval - assert model_task.max_attempts == max_attempts - assert model_task.ignore_failure == ignore_failure + api_task = api.task.OperationTask.node_instance( + name=operation_name, + instance=node_instance, + inputs=inputs, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure) + + assert api_task.name == '{0}.{1}'.format(operation_name, node_instance.id) + assert api_task.operation_mapping is True + assert api_task.actor == node_instance + assert api_task.inputs == inputs + assert api_task.retry_interval == retry_interval + assert api_task.max_attempts == max_attempts + assert api_task.ignore_failure == ignore_failure + + def test_relationship_operation_task_creation(self): + workflow_context = mock.context.simple() + + operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure' + op_details = {'operation': True} + relationship = mock.models.get_relationship() + relationship.source_operations[operation_name] = op_details + relationship_instance = mock.models.get_relationship_instance(relationship=relationship) + inputs = {'inputs': True} + max_attempts = 10 + retry_interval = 10 + + with context.workflow.current.push(workflow_context): + api_task = api.task.OperationTask.relationship_instance( + name=operation_name, + instance=relationship_instance, + operation_end=api.task.OperationTask.SOURCE_OPERATION, + inputs=inputs, + max_attempts=max_attempts, + retry_interval=retry_interval) + + assert api_task.name == '{0}.{1}'.format(operation_name, relationship_instance.id) + assert api_task.operation_mapping is True + assert api_task.actor == relationship_instance + assert api_task.inputs == inputs + assert api_task.retry_interval == retry_interval + assert api_task.max_attempts == max_attempts def test_operation_task_default_values(self): workflow_context = mock.context.simple(task_ignore_failure=True) with context.workflow.current.push(workflow_context): model_task = api.task.OperationTask( name='stub', - operation_details={}, - node_instance=mock.models.get_dependency_node_instance()) + operation_mapping='', + actor=mock.models.get_dependency_node_instance()) assert model_task.inputs == {} - assert model_task.retry_interval == workflow_context.task_retry_interval - assert model_task.max_attempts == workflow_context.task_max_attempts - assert model_task.ignore_failure == workflow_context.task_ignore_failure + assert model_task.retry_interval == workflow_context._task_retry_interval + assert model_task.max_attempts == workflow_context._task_max_attempts + assert model_task.ignore_failure == workflow_context._task_ignore_failure class TestWorkflowTask(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/workflows/builtin/test_execute_operation.py ---------------------------------------------------------------------- diff --git a/tests/workflows/builtin/test_execute_operation.py b/tests/workflows/builtin/test_execute_operation.py index 9409686..0bd59ed 100644 --- a/tests/workflows/builtin/test_execute_operation.py +++ b/tests/workflows/builtin/test_execute_operation.py @@ -46,6 +46,6 @@ def test_execute_operation(ctx): ) assert len(execute_tasks) == 1 - assert execute_tasks[0].name == '{0}.{1}'.format(node_instance_id, operation_name) + assert execute_tasks[0].name == '{0}.{1}'.format(operation_name, node_instance_id) # TODO: add more scenarios http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py index 6377a69..9e12c0b 100644 --- a/tests/workflows/core/test_engine.py +++ b/tests/workflows/core/test_engine.py @@ -20,18 +20,24 @@ from datetime import datetime import pytest import aria -from aria import events -from aria import workflow -from aria import context +from aria import ( + events, + workflow, + operation, + context +) from aria.storage import models -from aria.workflows import exceptions -from aria.workflows.executor import thread +from aria.workflows import ( + api, + exceptions, +) from aria.workflows.core import engine -from aria.workflows import api +from aria.workflows.executor import thread -from tests import mock import tests.storage +from tests import mock + global_test_holder = {} @@ -59,11 +65,14 @@ class BaseTest(object): max_attempts=None, retry_interval=None, ignore_failure=None): - return api.task.OperationTask( - name='task', - operation_details={'operation': 'tests.workflows.core.test_engine.{name}'.format( - name=func.__name__)}, - node_instance=ctx.model.node_instance.get('dependency_node_instance'), + node_instance = ctx.model.node_instance.get('dependency_node_instance') + node_instance.node.operations['aria.interfaces.lifecycle.create'] = { + 'operation': 'tests.workflows.core.test_engine.{name}'.format(name=func.__name__) + } + ctx.model.node_instance.store(node_instance) + return api.task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.create', inputs=inputs, max_attempts=max_attempts, retry_interval=retry_interval, @@ -122,13 +131,9 @@ class BaseTest(object): def workflow_context(self): model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver()) model_storage.setup() - deployment = models.Deployment( - id='d1', - blueprint_id='b1', - description=None, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), - workflows={}) + blueprint = mock.models.get_blueprint() + deployment = mock.models.get_deployment() + model_storage.blueprint.store(blueprint) model_storage.deployment.store(deployment) node = mock.models.get_dependency_node() node_instance = mock.models.get_dependency_node_instance(node) @@ -396,20 +401,24 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 1 -def mock_success_task(): +@operation +def mock_success_task(**_): pass -def mock_failed_task(): +@operation +def mock_failed_task(**_): raise RuntimeError -def mock_ordered_task(counter): +@operation +def mock_ordered_task(counter, **_): invocations = global_test_holder.setdefault('invocations', []) invocations.append(counter) -def mock_conditional_failure_task(failure_count): +@operation +def mock_conditional_failure_task(failure_count, **_): invocations = global_test_holder.setdefault('invocations', []) try: if len(invocations) < failure_count: @@ -418,7 +427,7 @@ def mock_conditional_failure_task(failure_count): invocations.append(time.time()) -def mock_sleep_task(seconds): +def mock_sleep_task(seconds, **_): invocations = global_test_holder.setdefault('invocations', []) invocations.append(time.time()) time.sleep(seconds) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task.py b/tests/workflows/core/test_task.py new file mode 100644 index 0000000..56e49c6 --- /dev/null +++ b/tests/workflows/core/test_task.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import ( + datetime, + timedelta +) + +import pytest + +from aria.context import workflow as workflow_context +from aria.workflows import ( + api, + core, + exceptions, +) + +from ... import mock + + [email protected] +def ctx(): + simple_context = mock.context.simple() + + blueprint = mock.models.get_blueprint() + deployment = mock.models.get_deployment() + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + execution = mock.models.get_execution() + + simple_context.model.blueprint.store(blueprint) + simple_context.model.deployment.store(deployment) + simple_context.model.node.store(node) + simple_context.model.node_instance.store(node_instance) + simple_context.model.execution.store(execution) + + return simple_context + + +class TestOperationTask(object): + + def _create_operation_task(self, ctx, node_instance): + with workflow_context.current.push(ctx): + api_task = api.task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.create', + ) + + core_task = core.task.OperationTask(api_task=api_task) + + return api_task, core_task + + def test_operation_task_creation(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + api_task, core_task = self._create_operation_task(ctx, node_instance) + storage_task = ctx.model.task.get(core_task.id) + + assert core_task.model_task == storage_task + assert core_task.name == api_task.name + assert core_task.operation_mapping == api_task.operation_mapping + assert core_task.actor == api_task.actor == node_instance + assert core_task.inputs == api_task.inputs == storage_task.inputs + + def test_operation_task_edit_locked_attribute(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + + _, core_task = self._create_operation_task(ctx, node_instance) + now = datetime.utcnow() + with pytest.raises(exceptions.TaskException): + core_task.status = core_task.STARTED + with pytest.raises(exceptions.TaskException): + core_task.started_at = now + with pytest.raises(exceptions.TaskException): + core_task.ended_at = now + with pytest.raises(exceptions.TaskException): + core_task.retry_count = 2 + with pytest.raises(exceptions.TaskException): + core_task.due_at = now + + def test_operation_task_edit_attributes(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + + _, core_task = self._create_operation_task(ctx, node_instance) + future_time = datetime.utcnow() + timedelta(seconds=3) + + with core_task._update(): + core_task.status = core_task.STARTED + core_task.started_at = future_time + core_task.ended_at = future_time + core_task.retry_count = 2 + core_task.eta = future_time + assert core_task.status != core_task.STARTED + assert core_task.started_at != future_time + assert core_task.ended_at != future_time + assert core_task.retry_count != 2 + assert core_task.due_at != future_time + + assert core_task.status == core_task.STARTED + assert core_task.started_at == future_time + assert core_task.ended_at == future_time + assert core_task.retry_count == 2 + assert core_task.eta == future_time http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py index 75e825f..2e7c7df 100644 --- a/tests/workflows/core/test_task_graph_into_exececution_graph.py +++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py @@ -22,22 +22,30 @@ from ... import mock def test_task_graph_into_execution_graph(): + operation_name = 'aria.interfaces.lifecycle.create' task_context = mock.context.simple() node = mock.models.get_dependency_node() node_instance = mock.models.get_dependency_node_instance() + deployment = mock.models.get_deployment() + execution = mock.models.get_execution() task_context.model.node.store(node) task_context.model.node_instance.store(node_instance) + task_context.model.deployment.store(deployment) + task_context.model.execution.store(execution) def sub_workflow(name, **_): return api.task_graph.TaskGraph(name) with context.workflow.current.push(task_context): test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') - simple_before_task = api.task.OperationTask('test_simple_before_task', {}, node_instance) - simple_after_task = api.task.OperationTask('test_simple_after_task', {}, node_instance) + simple_before_task = api.task.OperationTask.node_instance(instance=node_instance, + name=operation_name) + simple_after_task = api.task.OperationTask.node_instance(instance=node_instance, + name=operation_name) inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') - inner_task = api.task.OperationTask('test_inner_task', {}, node_instance) + inner_task = api.task.OperationTask.node_instance(instance=node_instance, + name=operation_name) inner_task_graph.add_tasks(inner_task) test_task_graph.add_tasks(simple_before_task) @@ -88,8 +96,8 @@ def test_task_graph_into_execution_graph(): def _assert_execution_is_api_task(execution_task, api_task): assert execution_task.id == api_task.id assert execution_task.name == api_task.name - assert execution_task.operation_details == api_task.operation_details - assert execution_task.node_instance == api_task.node_instance + assert execution_task.operation_mapping == api_task.operation_mapping + assert execution_task.actor == api_task.actor assert execution_task.inputs == api_task.inputs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/executor/test_executor.py b/tests/workflows/executor/test_executor.py index 0faa753..df2728c 100644 --- a/tests/workflows/executor/test_executor.py +++ b/tests/workflows/executor/test_executor.py @@ -81,15 +81,15 @@ class TestExecutor(object): self.executor.close() -def mock_successful_task(): +def mock_successful_task(**_): pass -def mock_failing_task(): +def mock_failing_task(**_): raise MockException -def mock_task_with_input(input): +def mock_task_with_input(input, **_): raise MockException(input) if app: @@ -106,16 +106,17 @@ class MockTask(object): INFINITE_RETRIES = models.Task.INFINITE_RETRIES - def __init__(self, func, inputs=None): + def __init__(self, func, inputs=None, ctx=None): self.states = [] self.exception = None self.id = str(uuid.uuid4()) name = func.__name__ operation = 'tests.workflows.executor.test_executor.{name}'.format(name=name) - self.operation_details = {'operation': operation} + self.operation_mapping = operation self.logger = logging.getLogger() self.name = name self.inputs = inputs or {} + self.context = ctx or None self.retry_count = 0 self.max_attempts = 1 @@ -123,7 +124,7 @@ class MockTask(object): setattr(self, state.upper(), state) @contextmanager - def update(self): + def _update(self): yield self
