Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-9-API-for-operation-context bd22e8ebf -> f32756128 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f3275612/tests/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py index 6377a69..cad3312 100644 --- a/tests/workflows/core/test_engine.py +++ b/tests/workflows/core/test_engine.py @@ -20,18 +20,24 @@ from datetime import datetime import pytest import aria -from aria import events -from aria import workflow -from aria import context +from aria import ( + events, + workflow, + operation, + context +) from aria.storage import models -from aria.workflows import exceptions -from aria.workflows.executor import thread +from aria.workflows import ( + api, + exceptions, +) from aria.workflows.core import engine -from aria.workflows import api +from aria.workflows.executor import thread -from tests import mock import tests.storage +from tests import mock + global_test_holder = {} @@ -59,11 +65,14 @@ class BaseTest(object): max_attempts=None, retry_interval=None, ignore_failure=None): - return api.task.OperationTask( - name='task', - operation_details={'operation': 'tests.workflows.core.test_engine.{name}'.format( - name=func.__name__)}, - node_instance=ctx.model.node_instance.get('dependency_node_instance'), + node_instance = ctx.model.node_instance.get('dependency_node_instance') + node_instance.node.operations['aria.interfaces.lifecycle.create'] = { + 'operation': 'tests.workflows.core.test_engine.{name}'.format(name=func.__name__) + } + ctx.model.node_instance.store(node_instance) + return api.task.NodeOperationTask( + actor=node_instance, + name='aria.interfaces.lifecycle.create', inputs=inputs, max_attempts=max_attempts, retry_interval=retry_interval, @@ -122,13 +131,9 @@ class BaseTest(object): def workflow_context(self): model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver()) model_storage.setup() - deployment = models.Deployment( - id='d1', - blueprint_id='b1', - description=None, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), - workflows={}) + blueprint = mock.models.get_blueprint() + deployment = mock.models.get_deployment() + model_storage.blueprint.store(blueprint) model_storage.deployment.store(deployment) node = mock.models.get_dependency_node() node_instance = mock.models.get_dependency_node_instance(node) @@ -396,20 +401,24 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 1 -def mock_success_task(): +@operation +def mock_success_task(**_): pass -def mock_failed_task(): +@operation +def mock_failed_task(**_): raise RuntimeError -def mock_ordered_task(counter): +@operation +def mock_ordered_task(counter, **_): invocations = global_test_holder.setdefault('invocations', []) invocations.append(counter) -def mock_conditional_failure_task(failure_count): +@operation +def mock_conditional_failure_task(failure_count, **_): invocations = global_test_holder.setdefault('invocations', []) try: if len(invocations) < failure_count: @@ -418,7 +427,7 @@ def mock_conditional_failure_task(failure_count): invocations.append(time.time()) -def mock_sleep_task(seconds): +def mock_sleep_task(seconds, **_): invocations = global_test_holder.setdefault('invocations', []) invocations.append(time.time()) time.sleep(seconds) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f3275612/tests/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task.py b/tests/workflows/core/test_task.py new file mode 100644 index 0000000..d0b329a --- /dev/null +++ b/tests/workflows/core/test_task.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import ( + datetime, + timedelta +) + +import pytest + +from aria.context import workflow as workflow_context +from aria.workflows import ( + api, + core, + exceptions, +) + +from ... import mock + + [email protected] +def ctx(): + simple_context = mock.context.simple() + + blueprint = mock.models.get_blueprint() + deployment = mock.models.get_deployment() + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + execution = mock.models.get_execution() + + simple_context.model.blueprint.store(blueprint) + simple_context.model.deployment.store(deployment) + simple_context.model.node.store(node) + simple_context.model.node_instance.store(node_instance) + simple_context.model.execution.store(execution) + + return simple_context + + +class TestOperationTask(object): + + def _create_operation_task(self, ctx, node_instance): + with workflow_context.current.push(ctx): + api_task = api.task.NodeOperationTask( + actor=node_instance, + name='aria.interfaces.lifecycle.create', + ) + + core_task = core.task.OperationTask(api_task=api_task) + + return api_task, core_task + + def test_operation_task_creation(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + api_task, core_task = self._create_operation_task(ctx, node_instance) + storage_task = ctx.model.task.get(core_task.id) + + assert core_task.model_context == storage_task + assert core_task.name == api_task.name + assert core_task.operation_mapping == api_task.operation_mapping + assert core_task.actor == api_task.actor == node_instance + assert core_task.inputs == api_task.inputs == storage_task.inputs + + def test_operation_task_edit_locked_attribute(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + + _, core_task = self._create_operation_task(ctx, node_instance) + now = datetime.utcnow() + with pytest.raises(exceptions.TaskException): + core_task.status = core_task.STARTED + with pytest.raises(exceptions.TaskException): + core_task.started_at = now + with pytest.raises(exceptions.TaskException): + core_task.ended_at = now + with pytest.raises(exceptions.TaskException): + core_task.retry_count = 2 + with pytest.raises(exceptions.TaskException): + core_task.due_at = now + + def test_operation_task_edit_attributes(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + + _, core_task = self._create_operation_task(ctx, node_instance) + future_time = datetime.utcnow() + timedelta(seconds=3) + + with core_task.update(): + core_task.status = core_task.STARTED + core_task.started_at = future_time + core_task.ended_at = future_time + core_task.retry_count = 2 + core_task.eta = future_time + assert core_task.status != core_task.STARTED + assert core_task.started_at != future_time + assert core_task.ended_at != future_time + assert core_task.retry_count != 2 + assert core_task.due_at != future_time + + assert core_task.status == core_task.STARTED + assert core_task.started_at == future_time + assert core_task.ended_at == future_time + assert core_task.retry_count == 2 + assert core_task.eta == future_time http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f3275612/tests/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py index 75e825f..e48fc9a 100644 --- a/tests/workflows/core/test_task_graph_into_exececution_graph.py +++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py @@ -22,22 +22,27 @@ from ... import mock def test_task_graph_into_execution_graph(): + operation_name = 'aria.interfaces.lifecycle.create' task_context = mock.context.simple() node = mock.models.get_dependency_node() node_instance = mock.models.get_dependency_node_instance() + deployment = mock.models.get_deployment() + execution = mock.models.get_execution() task_context.model.node.store(node) task_context.model.node_instance.store(node_instance) + task_context.model.deployment.store(deployment) + task_context.model.execution.store(execution) def sub_workflow(name, **_): return api.task_graph.TaskGraph(name) with context.workflow.current.push(task_context): test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') - simple_before_task = api.task.OperationTask('test_simple_before_task', {}, node_instance) - simple_after_task = api.task.OperationTask('test_simple_after_task', {}, node_instance) + simple_before_task = api.task.NodeOperationTask(node_instance, operation_name) + simple_after_task = api.task.NodeOperationTask(node_instance, operation_name) inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') - inner_task = api.task.OperationTask('test_inner_task', {}, node_instance) + inner_task = api.task.NodeOperationTask(node_instance, operation_name) inner_task_graph.add_tasks(inner_task) test_task_graph.add_tasks(simple_before_task) @@ -88,8 +93,8 @@ def test_task_graph_into_execution_graph(): def _assert_execution_is_api_task(execution_task, api_task): assert execution_task.id == api_task.id assert execution_task.name == api_task.name - assert execution_task.operation_details == api_task.operation_details - assert execution_task.node_instance == api_task.node_instance + assert execution_task.operation_mapping == api_task.operation_mapping + assert execution_task.actor == api_task.actor assert execution_task.inputs == api_task.inputs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f3275612/tests/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/workflows/executor/test_executor.py b/tests/workflows/executor/test_executor.py index 0faa753..6c7551c 100644 --- a/tests/workflows/executor/test_executor.py +++ b/tests/workflows/executor/test_executor.py @@ -81,15 +81,15 @@ class TestExecutor(object): self.executor.close() -def mock_successful_task(): +def mock_successful_task(**_): pass -def mock_failing_task(): +def mock_failing_task(**_): raise MockException -def mock_task_with_input(input): +def mock_task_with_input(input, **_): raise MockException(input) if app: @@ -106,16 +106,17 @@ class MockTask(object): INFINITE_RETRIES = models.Task.INFINITE_RETRIES - def __init__(self, func, inputs=None): + def __init__(self, func, inputs=None, ctx=None): self.states = [] self.exception = None self.id = str(uuid.uuid4()) name = func.__name__ operation = 'tests.workflows.executor.test_executor.{name}'.format(name=name) - self.operation_details = {'operation': operation} + self.operation_mapping = operation self.logger = logging.getLogger() self.name = name self.inputs = inputs or {} + self.context = ctx or None self.retry_count = 0 self.max_attempts = 1
