http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/api/test_task_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/api/test_task_graph.py b/tests/orchestrator/workflows/api/test_task_graph.py new file mode 100644 index 0000000..a569386 --- /dev/null +++ b/tests/orchestrator/workflows/api/test_task_graph.py @@ -0,0 +1,745 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task_graph, task + + +class MockTask(task.BaseTask): + def __init__(self): + super(MockTask, self).__init__(ctx={}) + + +@pytest.fixture +def graph(): + return task_graph.TaskGraph(name='mock-graph') + + +class TestTaskGraphTasks(object): + + def test_add_task(self, graph): + task = MockTask() + add_result = graph.add_tasks(task) + assert add_result == [task] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_add_empty_group(self, graph): + result = graph.add_tasks([]) + assert result == [] + + def test_add_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + added_tasks = graph.add_tasks(*tasks) + assert added_tasks == tasks + + def test_add_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + added_tasks = graph.add_tasks(*tasks) + assert added_tasks == [tasks[0], tasks[2]] + + def test_add_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + added_tasks = graph.add_tasks(tasks) + assert added_tasks == [tasks[0], recursive_group[0], recursive_group[1], tasks[2]] + + def test_add_existing_task(self, graph): + task = MockTask() + graph.add_tasks(task) + # adding a task already in graph - should have no effect, and return False + add_result = graph.add_tasks(task) + assert add_result == [] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_remove_task(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + graph.remove_tasks(other_task) + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_remove_tasks_with_dependency(self, graph): + task = MockTask() + dependent_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependent_task) + graph.add_dependency(dependent_task, task) + remove_result = graph.remove_tasks(dependent_task) + assert remove_result == [dependent_task] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + # asserting no dependencies are left for the dependent task + assert len(list(graph.get_dependencies(task))) == 0 + + def test_remove_empty_group(self, graph): + result = graph.remove_tasks([]) + assert result == [] + + def test_remove_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.add_tasks(*tasks) + removed_tasks = graph.remove_tasks(*tasks) + assert removed_tasks == tasks + + def test_remove_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + removed_tasks = graph.remove_tasks(*tasks) + assert removed_tasks == [task] + + def test_remove_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + graph.add_tasks(tasks) + removed_tasks = graph.remove_tasks(tasks) + assert removed_tasks == [tasks[0], recursive_group[0], recursive_group[1], tasks[2]] + + def test_remove_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + # removing a task not in graph - should have no effect, and return False + remove_result = graph.remove_tasks(task_not_in_graph) + assert remove_result == [] + tasks = [t for t in graph.tasks] + assert len(tasks) == 1 + assert tasks[0] == task + + def test_has_task(self, graph): + task = MockTask() + graph.add_tasks(task) + assert graph.has_tasks(task) is True + + def test_has_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + assert graph.has_tasks(task_not_in_graph) is False + + def test_has_empty_group(self, graph): + # the "empty task" is in the graph + assert graph.has_tasks([]) is True + + def test_has_group(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.add_tasks(*tasks) + assert graph.has_tasks(*tasks) is True + + def test_has_partially_existing_group(self, graph): + task = MockTask() + graph.add_tasks(task) + tasks = [MockTask(), task, MockTask()] + assert graph.has_tasks(tasks) is False + + def test_has_recursively_group(self, graph): + recursive_group = [MockTask(), MockTask()] + tasks = [MockTask(), recursive_group, MockTask()] + graph.add_tasks(tasks) + assert graph.has_tasks(tasks) is True + + def test_get_task(self, graph): + task = MockTask() + graph.add_tasks(task) + assert graph.get_task(task.id) == task + + def test_get_nonexistent_task(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.get_task(task_not_in_graph.id) + + +class TestTaskGraphGraphTraversal(object): + + def test_tasks_iteration(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + tasks = [t for t in graph.tasks] + assert set(tasks) == set([task, other_task]) + + def test_get_dependents(self, graph): + task = MockTask() + dependent_task_1 = MockTask() + dependent_task_2 = MockTask() + transitively_dependent_task = MockTask() + + graph.add_tasks(task) + graph.add_tasks(dependent_task_1) + graph.add_tasks(dependent_task_2) + graph.add_tasks(transitively_dependent_task) + + graph.add_dependency(dependent_task_1, task) + graph.add_dependency(dependent_task_2, task) + graph.add_dependency(transitively_dependent_task, dependent_task_2) + + dependent_tasks = list(graph.get_dependents(task)) + # transitively_dependent_task not expected to appear in the result + assert set(dependent_tasks) == set([dependent_task_1, dependent_task_2]) + + def test_get_task_empty_dependents(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + dependent_tasks = list(graph.get_dependents(task)) + assert len(dependent_tasks) == 0 + + def test_get_nonexistent_task_dependents(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + list(graph.get_dependents(task_not_in_graph)) + + def test_get_dependencies(self, graph): + task = MockTask() + dependency_task_1 = MockTask() + dependency_task_2 = MockTask() + transitively_dependency_task = MockTask() + + graph.add_tasks(task) + graph.add_tasks(dependency_task_1) + graph.add_tasks(dependency_task_2) + graph.add_tasks(transitively_dependency_task) + + graph.add_dependency(task, dependency_task_1) + graph.add_dependency(task, dependency_task_2) + graph.add_dependency(dependency_task_2, transitively_dependency_task) + + dependency_tasks = list(graph.get_dependencies(task)) + # transitively_dependency_task not expected to appear in the result + assert set(dependency_tasks) == set([dependency_task_1, dependency_task_2]) + + def test_get_task_empty_dependencies(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 0 + + def test_get_nonexistent_task_dependencies(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + list(graph.get_dependencies(task_not_in_graph)) + + +class TestTaskGraphDependencies(object): + + def test_add_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + unrelated_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_tasks(unrelated_task) + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + assert add_result is True + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 1 + assert dependency_tasks[0] == dependency_task + + def test_add_existing_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + # adding a dependency already in graph - should have no effect, and return False + assert add_result is True + graph.add_dependency(task, dependency_task) + add_result = graph.has_dependency(task, dependency_task) + assert add_result is True + dependency_tasks = list(graph.get_dependencies(task)) + assert len(dependency_tasks) == 1 + assert dependency_tasks[0] == dependency_task + + def test_add_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.add_dependency(task_not_in_graph, task) + + def test_add_dependency_nonexistent_dependency(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.add_dependency(task, task_not_in_graph) + + def test_add_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting add_dependency result to be False - no dependency has been created + assert set(graph.tasks) == set((task,)) + + def test_add_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting add_dependency result to be False - no dependency has been created + assert set(graph.tasks) == set((task,)) + + def test_add_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(group_tasks, task) + assert graph.has_dependency(group_tasks[0], task) is True + assert graph.has_dependency(group_tasks[1], task) is True + assert graph.has_dependency(group_tasks[2], task) is True + + def test_add_dependency_dependency_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks[0]) is True + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is True + + def test_add_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_1_tasks, group_2_tasks) + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_1_tasks[0], group_2_task) is True + assert graph.has_dependency(group_1_tasks[1], group_2_task) is True + assert graph.has_dependency(group_1_tasks[2], group_2_task) is True + + def test_add_dependency_dependency_group_with_some_existing_dependencies(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + # adding a dependency on a specific task manually, + # before adding a dependency on the whole parallel + graph.add_dependency(task, group_tasks[1]) + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks[0]) is True + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is True + + def test_add_existing_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_1_tasks, group_2_tasks) + add_result = graph.has_dependency(group_1_tasks, group_2_tasks) + assert add_result is True + # adding a dependency already in graph - should have no effect, and return False + graph.add_dependency(group_1_tasks, group_2_tasks) + add_result = graph.has_dependency(group_1_tasks, group_2_tasks) + assert add_result is True + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_1_tasks[0], group_2_task) is True + assert graph.has_dependency(group_1_tasks[1], group_2_task) is True + assert graph.has_dependency(group_1_tasks[2], group_2_task) is True + + def test_has_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_dependency(task, dependency_task) + assert graph.has_dependency(task, dependency_task) is True + + def test_has_nonexistent_dependency(self, graph): + task = MockTask() + other_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(other_task) + assert graph.has_dependency(task, other_task) is False + + def test_has_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.has_dependency(task_not_in_graph, task) + + def test_has_dependency_nonexistent_dependency(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.has_dependency(task, task_not_in_graph) + + def test_has_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting has_dependency result to be False - dependency in an empty form + assert graph.has_dependency([], task) is False + + def test_has_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting has_dependency result to be True - dependency in an empty form + assert graph.has_dependency(task, []) is False + + def test_has_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + assert graph.has_dependency(group_tasks, task) is False + graph.add_dependency(group_tasks, task) + assert graph.has_dependency(group_tasks, task) is True + + def test_has_dependency_dependency_parallel(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + assert graph.has_dependency(task, group_tasks) is False + graph.add_dependency(task, group_tasks) + assert graph.has_dependency(task, group_tasks) is True + + def test_has_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + assert graph.has_dependency(group_2_tasks, group_1_tasks) is False + graph.add_dependency(group_2_tasks, group_1_tasks) + assert graph.has_dependency(group_2_tasks, group_1_tasks) is True + + def test_has_dependency_dependency_parallel_with_some_existing_dependencies(self, graph): + task = MockTask() + parallel_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + parallel = graph.add_tasks(*parallel_tasks) + graph.add_dependency(task, parallel_tasks[1]) + # only a partial dependency exists - has_dependency is expected to return False + assert graph.has_dependency(task, parallel) is False + + def test_has_nonexistent_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + assert graph.has_dependency(group_1_tasks, group_2_tasks) is False + + def test_remove_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + another_dependent_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + graph.add_tasks(another_dependent_task) + graph.add_dependency(task, dependency_task) + graph.add_dependency(another_dependent_task, dependency_task) + + graph.remove_dependency(task, dependency_task) + remove_result = graph.has_dependency(task, dependency_task) + assert remove_result is False + assert graph.has_dependency(task, dependency_task) is False + assert graph.has_dependency(another_dependent_task, dependency_task) is True + + def test_remove_nonexistent_dependency(self, graph): + task = MockTask() + dependency_task = MockTask() + graph.add_tasks(task) + graph.add_tasks(dependency_task) + # removing a dependency not in graph - should have no effect, and return False + graph.remove_dependency(task, dependency_task) + remove_result = graph.has_dependency(task, dependency_task) + assert remove_result is False + tasks = [t for t in graph.tasks] + assert set(tasks) == set([task, dependency_task]) + + def test_remove_dependency_nonexistent_dependent(self, graph): + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.remove_dependency(task_not_in_graph, task) + + def test_remove_dependency_nonexistent_dependency(self, graph): + # in this test the dependency *task* is not in the graph, not just the dependency itself + task = MockTask() + task_not_in_graph = MockTask() + graph.add_tasks(task) + with pytest.raises(task_graph.TaskNotInGraphError): + graph.remove_dependency(task, task_not_in_graph) + + def test_remove_dependency_empty_dependent(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting remove_dependency result to be False - no dependency has been created + graph.remove_dependency([], task) + assert set(graph.tasks) == set((task,)) + + def test_remove_dependency_empty_dependency(self, graph): + task = MockTask() + graph.add_tasks(task) + # expecting remove_dependency result to be False - no dependency has been created + graph.remove_dependency(task, []) + assert set(graph.tasks) == set((task,)) + + def test_remove_dependency_dependent_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(group_tasks, task) + graph.remove_dependency(group_tasks, task) + remove_result = graph.has_dependency(group_tasks, task) + assert remove_result is False + assert graph.has_dependency(group_tasks[0], task) is False + assert graph.has_dependency(group_tasks[1], task) is False + assert graph.has_dependency(group_tasks[2], task) is False + + def test_remove_dependency_dependency_group(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks) + graph.remove_dependency(task, group_tasks) + remove_result = graph.has_dependency(task, group_tasks) + assert remove_result is False + assert graph.has_dependency(task, group_tasks[0]) is False + assert graph.has_dependency(task, group_tasks[1]) is False + assert graph.has_dependency(task, group_tasks[2]) is False + + def test_remove_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + graph.add_dependency(group_2_tasks, group_1_tasks) + graph.remove_dependency(group_2_tasks, group_1_tasks) + remove_result = graph.has_dependency(group_2_tasks, group_1_tasks) + assert remove_result is False + for group_2_task in group_2_tasks: + assert graph.has_dependency(group_2_task, group_1_tasks[0]) is False + assert graph.has_dependency(group_2_task, group_1_tasks[1]) is False + assert graph.has_dependency(group_2_task, group_1_tasks[2]) is False + + def test_remove_dependency_dependency_group_with_some_existing_dependencies(self, graph): + task = MockTask() + group_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(task) + graph.add_tasks(*group_tasks) + graph.add_dependency(task, group_tasks[1]) + graph.remove_dependency(task, group_tasks) + remove_result = graph.has_dependency(task, group_tasks) + # only a partial dependency exists - remove_dependency is expected to return False + assert remove_result is False + # no dependencies are expected to have changed + assert graph.has_dependency(task, group_tasks[0]) is False + assert graph.has_dependency(task, group_tasks[1]) is True + assert graph.has_dependency(task, group_tasks[2]) is False + + def test_remove_nonexistent_dependency_between_groups(self, graph): + group_1_tasks = [MockTask() for _ in xrange(3)] + group_2_tasks = [MockTask() for _ in xrange(3)] + graph.add_tasks(*group_1_tasks) + graph.add_tasks(*group_2_tasks) + # removing a dependency not in graph - should have no effect, and return False + graph.remove_dependency(group_2_tasks, group_1_tasks) + remove_result = graph.has_dependency(group_2_tasks, group_1_tasks) + assert remove_result is False + + # nested tests + + def test_group_with_nested_sequence(self, graph): + all_tasks = [MockTask() for _ in xrange(5)] + graph.add_tasks(all_tasks[0], + graph.sequence(all_tasks[1], all_tasks[2], all_tasks[3]), + all_tasks[4]) + assert set(graph.tasks) == set(all_tasks) + + # tasks[2] and tasks[3] should each have a single dependency; the rest should have none + assert len(list(graph.get_dependencies(all_tasks[0]))) == 0 + assert len(list(graph.get_dependencies(all_tasks[1]))) == 0 + assert set(graph.get_dependencies(all_tasks[2])) == set([all_tasks[1]]) + assert set(graph.get_dependencies(all_tasks[3])) == set([all_tasks[2]]) + assert len(list(graph.get_dependencies(all_tasks[4]))) == 0 + + def test_group_with_nested_group(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.add_tasks(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # none of the tasks should have any dependencies + for i in xrange(len(tasks)): + assert len(list(graph.get_dependencies(tasks[i]))) == 0 + + def test_group_with_recursively_nested_group(self, graph): + recursively_nested_tasks = [MockTask(), MockTask(), MockTask()] + nested_tasks = [MockTask(), MockTask(), MockTask(), recursively_nested_tasks] + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.add_tasks(*tasks) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + for tasks_list in [tasks, nested_tasks, recursively_nested_tasks]: + for i in xrange(len(tasks_list[:3])): + assert len(list(graph.get_dependencies(tasks_list[i]))) == 0 + + def test_group_with_recursively_nested_group_and_interdependencies(self, graph): + recursively_nested_tasks = [MockTask(), MockTask(), MockTask()] + nested_tasks = [MockTask(), MockTask(), MockTask(), recursively_nested_tasks] + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.add_tasks(*tasks) + + graph.add_dependency(tasks[2], nested_tasks[2]) + graph.add_dependency(nested_tasks[1], recursively_nested_tasks[0]) + graph.add_dependency(recursively_nested_tasks[1], tasks[0]) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + assert set(graph.get_dependencies(tasks[0])) == set() + assert set(graph.get_dependencies(tasks[1])) == set() + assert set(graph.get_dependencies(tasks[2])) == set([nested_tasks[2]]) + + assert set(graph.get_dependencies(nested_tasks[0])) == set() + assert set(graph.get_dependencies(nested_tasks[1])) == set([recursively_nested_tasks[0]]) + assert set(graph.get_dependencies(nested_tasks[2])) == set() + + assert set(graph.get_dependencies(recursively_nested_tasks[0])) == set() + assert set(graph.get_dependencies(recursively_nested_tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(recursively_nested_tasks[2])) == set() + + +class TestTaskGraphSequence(object): + + def test_sequence(self, graph): + tasks = [MockTask(), MockTask(), MockTask()] + graph.sequence(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]]) + + def test_sequence_with_some_tasks_and_dependencies_already_in_graph(self, graph): + # tests both that tasks which werent previously in graph get inserted, and + # that existing tasks don't get re-added to graph + tasks = [MockTask(), MockTask(), MockTask()] + # insert some tasks and dependencies to the graph + graph.add_tasks(tasks[1]) + graph.add_tasks(tasks[2]) + graph.add_dependency(tasks[2], tasks[1]) + + graph.sequence(*tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]]) + assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]]) + + def test_sequence_with_nested_sequence(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.sequence(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # first task should have no dependencies + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + assert len(list(graph.get_dependencies(tasks[1]))) == 1 + assert len(list(graph.get_dependencies(tasks[2]))) == 2 + assert len(list(graph.get_dependencies(tasks[3]))) == 2 + assert len(list(graph.get_dependencies(tasks[4]))) == 3 + + def test_sequence_with_nested_group(self, graph): + tasks = [MockTask() for _ in xrange(5)] + graph.sequence(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4]) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set(tasks) + # first task should have no dependencies + assert len(list(graph.get_dependencies(tasks[0]))) == 0 + # rest of the tasks (except last) should have a single dependency - the first task + for i in xrange(1, 4): + assert set(graph.get_dependencies(tasks[i])) == set([tasks[0]]) + # last task should have have a dependency on all tasks except for the first one + assert set(graph.get_dependencies(tasks[4])) == set([tasks[1], tasks[2], tasks[3]]) + + def test_sequence_with_recursively_nested_group(self, graph): + recursively_nested_group = [MockTask(), MockTask()] + nested_group = [MockTask(), recursively_nested_group, MockTask()] + sequence_tasks = [MockTask(), nested_group, MockTask()] + + graph.sequence(*sequence_tasks) + graph_tasks = [t for t in graph.tasks] + assert set(graph_tasks) == set([sequence_tasks[0], nested_group[0], + recursively_nested_group[0], recursively_nested_group[1], + nested_group[2], sequence_tasks[2]]) + + assert list(graph.get_dependencies(nested_group[0])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(recursively_nested_group[0])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(recursively_nested_group[1])) == [sequence_tasks[0]] + assert list(graph.get_dependencies(nested_group[2])) == [sequence_tasks[0]] + + assert list(graph.get_dependents(nested_group[0])) == [sequence_tasks[2]] + assert list(graph.get_dependents(recursively_nested_group[0])) == [sequence_tasks[2]] + assert list(graph.get_dependents(recursively_nested_group[1])) == [sequence_tasks[2]] + assert list(graph.get_dependents(nested_group[2])) == [sequence_tasks[2]] + + def test_sequence_with_empty_group(self, graph): + tasks = [MockTask(), [], MockTask()] + graph.sequence(*tasks) + graph_tasks = set([t for t in graph.tasks]) + assert graph_tasks == set([tasks[0], tasks[2]]) + assert list(graph.get_dependents(tasks[0])) == [tasks[2]] + assert list(graph.get_dependencies(tasks[2])) == [tasks[0]] + + def test_sequence_with_recursively_nested_sequence_and_interdependencies(self, graph): + recursively_nested_tasks = list(graph.sequence(MockTask(), MockTask(), MockTask())) + nested_tasks = list(graph.sequence(MockTask(), + MockTask(), + MockTask(), + recursively_nested_tasks)) + tasks = [MockTask(), MockTask(), MockTask(), nested_tasks] + graph.sequence(*tasks) + + assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + recursively_nested_tasks) + assert set(graph.get_dependencies(tasks[0])) == set() + for i in xrange(1, len(tasks[:-1])): + assert set(graph.get_dependencies(tasks[i])) == set([tasks[i - 1]]) + + assert set(graph.get_dependencies(nested_tasks[0])) == set([tasks[2]]) + for i in xrange(1, len(nested_tasks[:-1])): + assert set(graph.get_dependencies(nested_tasks[i])) == \ + set([tasks[2], nested_tasks[i-1]]) + + assert set(graph.get_dependencies(recursively_nested_tasks[0])) == \ + set([tasks[2], nested_tasks[2]]) + for i in xrange(1, len(recursively_nested_tasks[:-1])): + assert set(graph.get_dependencies(recursively_nested_tasks[i])) == \ + set([tasks[2], nested_tasks[2], recursively_nested_tasks[i-1]])
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/__init__.py b/tests/orchestrator/workflows/builtin/__init__.py new file mode 100644 index 0000000..e100432 --- /dev/null +++ b/tests/orchestrator/workflows/builtin/__init__.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from tests import mock + +def assert_node_install_operations(operations, with_relationships=False): + if with_relationships: + all_operations = [ + 'aria.interfaces.lifecycle.create', + 'aria.interfaces.relationship_lifecycle.preconfigure', + 'aria.interfaces.relationship_lifecycle.preconfigure', + 'aria.interfaces.lifecycle.configure', + 'aria.interfaces.relationship_lifecycle.postconfigure', + 'aria.interfaces.relationship_lifecycle.postconfigure', + 'aria.interfaces.lifecycle.start', + 'aria.interfaces.relationship_lifecycle.establish', + 'aria.interfaces.relationship_lifecycle.establish', + ] + + for i, operation in enumerate(operations): + assert operation.name.startswith(all_operations[i]) + else: + for i, operation in enumerate(operations): + assert operation.name.startswith(mock.operations.NODE_OPERATIONS_INSTALL[i]) + + +def assert_node_uninstall_operations(operations, with_relationships=False): + if with_relationships: + all_operations = [ + 'aria.interfaces.lifecycle.stop', + 'aria.interfaces.relationship_lifecycle.unlink', + 'aria.interfaces.relationship_lifecycle.unlink', + 'aria.interfaces.lifecycle.delete', + ] + + for i, operation in enumerate(operations): + assert operation.name.startswith(all_operations[i]) + else: + for i, operation in enumerate(operations): + assert operation.name.startswith(mock.operations.NODE_OPERATIONS_UNINSTALL[i]) + + +def ctx_with_basic_graph(): + """ + Create the following graph in storage: + dependency_node <------ dependent_node + :return: + """ + simple_context = mock.context.simple() + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance( + dependency_node=dependency_node) + + relationship = mock.models.get_relationship(dependency_node) + relationship_instance = mock.models.get_relationship_instance( + relationship=relationship, + target_instance=dependency_node_instance + ) + + dependent_node = mock.models.get_dependent_node(relationship) + dependent_node_instance = mock.models.get_dependent_node_instance( + dependent_node=dependent_node, + relationship_instance=relationship_instance + ) + + simple_context.model.node.store(dependent_node) + simple_context.model.node.store(dependency_node) + simple_context.model.node_instance.store(dependent_node_instance) + simple_context.model.node_instance.store(dependency_node_instance) + simple_context.model.relationship.store(relationship) + simple_context.model.relationship_instance.store(relationship_instance) + + return simple_context http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/test_execute_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py b/tests/orchestrator/workflows/builtin/test_execute_operation.py new file mode 100644 index 0000000..83e0d4d --- /dev/null +++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task +from aria.orchestrator.workflows.builtin.execute_operation import execute_operation + +from tests import mock +from . import ctx_with_basic_graph + + +@pytest.fixture +def ctx(): + return ctx_with_basic_graph() + + +def test_execute_operation(ctx): + operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0] + node_instance_id = 'dependency_node_instance' + + execute_tasks = list( + task.WorkflowTask( + execute_operation, + ctx=ctx, + operation=operation_name, + operation_kwargs={}, + allow_kwargs_override=False, + run_by_dependency_order=False, + type_names=[], + node_ids=[], + node_instance_ids=[node_instance_id] + ).topological_order() + ) + + assert len(execute_tasks) == 1 + assert execute_tasks[0].name == '{0}.{1}'.format(operation_name, node_instance_id) + +# TODO: add more scenarios http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/test_heal.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/test_heal.py b/tests/orchestrator/workflows/builtin/test_heal.py new file mode 100644 index 0000000..940194b --- /dev/null +++ b/tests/orchestrator/workflows/builtin/test_heal.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task +from aria.orchestrator.workflows.builtin.heal import heal + +from . import (assert_node_install_operations, + assert_node_uninstall_operations, + ctx_with_basic_graph) + + +@pytest.fixture +def ctx(): + return ctx_with_basic_graph() + + +def test_heal_dependent_node(ctx): + heal_graph = task.WorkflowTask(heal, ctx=ctx, node_instance_id='dependent_node_instance') + + assert len(list(heal_graph.tasks)) == 2 + uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True)) + + assert len(list(uninstall_subgraph.tasks)) == 2 + dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = \ + list(uninstall_subgraph.topological_order(reverse=True)) + + assert len(list(install_subgraph.tasks)) == 2 + dependency_node_subgraph_install, dependent_node_subgraph_install = \ + list(install_subgraph.topological_order(reverse=True)) + + dependent_node_uninstall_tasks = \ + list(dependent_node_subgraph_uninstall.topological_order(reverse=True)) + assert isinstance(dependency_node_subgraph_uninstall, task.StubTask) + dependent_node_install_tasks = \ + list(dependent_node_subgraph_install.topological_order(reverse=True)) + assert isinstance(dependency_node_subgraph_install, task.StubTask) + + assert_node_uninstall_operations(dependent_node_uninstall_tasks, with_relationships=True) + assert_node_install_operations(dependent_node_install_tasks, with_relationships=True) + + +def test_heal_dependency_node(ctx): + heal_graph = task.WorkflowTask(heal, ctx=ctx, node_instance_id='dependency_node_instance') + # both subgraphs should contain un\install for both the dependent and the dependency + assert len(list(heal_graph.tasks)) == 2 + uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True)) + + uninstall_tasks = list(uninstall_subgraph.topological_order(reverse=True)) + assert len(uninstall_tasks) == 4 + unlink_source, unlink_target = uninstall_tasks[:2] + dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = uninstall_tasks[2:] + + install_tasks = list(install_subgraph.topological_order(reverse=True)) + assert len(install_tasks) == 4 + dependency_node_subgraph_install, dependent_node_subgraph_install = install_tasks[:2] + establish_source, establish_target = install_tasks[2:] + + assert isinstance(dependent_node_subgraph_uninstall, task.StubTask) + dependency_node_uninstall_tasks = \ + list(dependency_node_subgraph_uninstall.topological_order(reverse=True)) + assert isinstance(dependent_node_subgraph_install, task.StubTask) + dependency_node_install_tasks = \ + list(dependency_node_subgraph_install.topological_order(reverse=True)) + + assert unlink_source.name.startswith('aria.interfaces.relationship_lifecycle.unlink') + assert unlink_target.name.startswith('aria.interfaces.relationship_lifecycle.unlink') + assert_node_uninstall_operations(dependency_node_uninstall_tasks) + + assert_node_install_operations(dependency_node_install_tasks) + assert establish_source.name.startswith('aria.interfaces.relationship_lifecycle.establish') + assert establish_target.name.startswith('aria.interfaces.relationship_lifecycle.establish') + + +# TODO: add tests for contained in scenario http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/test_install.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/test_install.py b/tests/orchestrator/workflows/builtin/test_install.py new file mode 100644 index 0000000..3b23c5a --- /dev/null +++ b/tests/orchestrator/workflows/builtin/test_install.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.builtin.install import install +from aria.orchestrator.workflows.api import task + +from . import (assert_node_install_operations, + ctx_with_basic_graph) + + +@pytest.fixture +def ctx(): + return ctx_with_basic_graph() + + +def test_install(ctx): + install_tasks = list(task.WorkflowTask(install, ctx=ctx).topological_order(True)) + + assert len(install_tasks) == 2 + dependency_node_subgraph, dependent_node_subgraph = install_tasks + dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True)) + dependency_node_tasks = list(dependency_node_subgraph.topological_order(reverse=True)) + + assert_node_install_operations(dependency_node_tasks) + assert_node_install_operations(dependent_node_tasks, with_relationships=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/test_uninstall.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/test_uninstall.py b/tests/orchestrator/workflows/builtin/test_uninstall.py new file mode 100644 index 0000000..889e1d2 --- /dev/null +++ b/tests/orchestrator/workflows/builtin/test_uninstall.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.orchestrator.workflows.api import task +from aria.orchestrator.workflows.builtin.uninstall import uninstall + +from . import (assert_node_uninstall_operations, + ctx_with_basic_graph) + + +@pytest.fixture +def ctx(): + return ctx_with_basic_graph() + + +def test_uninstall(ctx): + uninstall_tasks = list(task.WorkflowTask(uninstall, ctx=ctx).topological_order(True)) + + assert len(uninstall_tasks) == 2 + dependent_node_subgraph, dependency_node_subgraph = uninstall_tasks + dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True)) + dependency_node_tasks = list(dependency_node_subgraph.topological_order(reverse=True)) + + assert_node_uninstall_operations(operations=dependency_node_tasks) + assert_node_uninstall_operations(operations=dependent_node_tasks, with_relationships=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/__init__.py b/tests/orchestrator/workflows/core/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/orchestrator/workflows/core/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py new file mode 100644 index 0000000..1b00bf6 --- /dev/null +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -0,0 +1,433 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import threading +from datetime import datetime + +import pytest + +import aria +from aria.orchestrator import ( + events, + workflow, + operation, + context +) +from aria.storage import models +from aria.orchestrator.workflows import ( + api, + exceptions, +) +from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.executor import thread + + +import tests.storage +from tests import mock + + +global_test_holder = {} + + +class BaseTest(object): + + @classmethod + def _execute(cls, workflow_func, workflow_context, executor): + eng = cls._engine(workflow_func=workflow_func, + workflow_context=workflow_context, + executor=executor) + eng.execute() + return eng + + @staticmethod + def _engine(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + return engine.Engine(executor=executor, + workflow_context=workflow_context, + tasks_graph=graph) + + @staticmethod + def _op(func, ctx, + inputs=None, + max_attempts=None, + retry_interval=None, + ignore_failure=None): + node_instance = ctx.model.node_instance.get('dependency_node_instance') + node_instance.node.operations['aria.interfaces.lifecycle.create'] = { + 'operation': '{name}.{func.__name__}'.format(name=__name__, func=func) + } + ctx.model.node_instance.store(node_instance) + return api.task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.create', + inputs=inputs, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure + ) + + @pytest.fixture(scope='function', autouse=True) + def globals_cleanup(self): + try: + yield + finally: + global_test_holder.clear() + + @pytest.fixture(scope='function', autouse=True) + def signals_registration(self, ): + def sent_task_handler(*args, **kwargs): + calls = global_test_holder.setdefault('sent_task_signal_calls', 0) + global_test_holder['sent_task_signal_calls'] = calls + 1 + + def start_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('start') + + def success_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('success') + + def failure_workflow_handler(workflow_context, exception, *args, **kwargs): + workflow_context.states.append('failure') + workflow_context.exception = exception + + def cancel_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('cancel') + + events.start_workflow_signal.connect(start_workflow_handler) + events.on_success_workflow_signal.connect(success_workflow_handler) + events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.on_cancelled_workflow_signal.connect(cancel_workflow_handler) + events.sent_task_signal.connect(sent_task_handler) + try: + yield + finally: + events.start_workflow_signal.disconnect(start_workflow_handler) + events.on_success_workflow_signal.disconnect(success_workflow_handler) + events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler) + events.sent_task_signal.disconnect(sent_task_handler) + + @pytest.fixture(scope='function') + def executor(self): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @pytest.fixture(scope='function') + def workflow_context(self): + model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver()) + model_storage.setup() + blueprint = mock.models.get_blueprint() + deployment = mock.models.get_deployment() + model_storage.blueprint.store(blueprint) + model_storage.deployment.store(deployment) + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + model_storage.node.store(node) + model_storage.node_instance.store(node_instance) + result = context.workflow.WorkflowContext( + name='test', + model_storage=model_storage, + resource_storage=None, + deployment_id=deployment.id, + workflow_id='name') + result.states = [] + result.exception = None + return result + + +class TestEngine(BaseTest): + + def test_empty_graph_execution(self, workflow_context, executor): + @workflow + def mock_workflow(**_): + pass + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert 'sent_task_signal_calls' not in global_test_holder + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.TERMINATED + + def test_single_task_successful_execution(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(mock_success_task, ctx)) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('sent_task_signal_calls') == 1 + + def test_single_task_failed_execution(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(mock_failed_task, ctx)) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert global_test_holder.get('sent_task_signal_calls') == 1 + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is not None + assert execution.status == models.Execution.FAILED + + def test_two_tasks_execution_order(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1}) + op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2}) + graph.sequence(op1, op2) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_stub_and_subworkflow_execution(self, workflow_context, executor): + @workflow + def sub_workflow(ctx, graph): + op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1}) + op2 = api.task.StubTask() + op3 = self._op(mock_ordered_task, ctx, inputs={'counter': 2}) + graph.sequence(op1, op2, op3) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx)) + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + +class TestCancel(BaseTest): + + def test_cancel_started_execution(self, workflow_context, executor): + number_of_tasks = 100 + + @workflow + def mock_workflow(ctx, graph): + return graph.sequence(*(self._op(mock_sleep_task, ctx, inputs={'seconds': 0.1}) + for _ in range(number_of_tasks))) + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + t = threading.Thread(target=eng.execute) + t.start() + time.sleep(1) + eng.cancel_execution() + t.join(timeout=30) + assert workflow_context.states == ['start', 'cancel'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert 0 < len(invocations) < number_of_tasks + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.CANCELLED + + def test_cancel_pending_execution(self, workflow_context, executor): + @workflow + def mock_workflow(graph, **_): + return graph + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + eng.cancel_execution() + execution = workflow_context.execution + assert execution.status == models.Execution.CANCELLED + + +class TestRetries(BaseTest): + + def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'failure_count': 1}, + max_attempts=2) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'failure_count': 2}, + max_attempts=2) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'failure_count': 1}, + max_attempts=3) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'failure_count': 2}, + max_attempts=3) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 3 + assert global_test_holder.get('sent_task_signal_calls') == 3 + + def test_infinite_retries(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'failure_count': 1}, + max_attempts=-1) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_retry_interval_float(self, workflow_context, executor): + self._test_retry_interval(retry_interval=0.3, + workflow_context=workflow_context, + executor=executor) + + def test_retry_interval_int(self, workflow_context, executor): + self._test_retry_interval(retry_interval=1, + workflow_context=workflow_context, + executor=executor) + + def _test_retry_interval(self, retry_interval, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + inputs={'failure_count': 1}, + max_attempts=2, + retry_interval=retry_interval) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_ignore_failure(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + ignore_failure=True, + inputs={'failure_count': 100}, + max_attempts=100) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + + +@operation +def mock_success_task(**_): + pass + + +@operation +def mock_failed_task(**_): + raise RuntimeError + + +@operation +def mock_ordered_task(counter, **_): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(counter) + + +@operation +def mock_conditional_failure_task(failure_count, **_): + invocations = global_test_holder.setdefault('invocations', []) + try: + if len(invocations) < failure_count: + raise RuntimeError + finally: + invocations.append(time.time()) + + +def mock_sleep_task(seconds, **_): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(time.time()) + time.sleep(seconds) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py new file mode 100644 index 0000000..6a4c8ac --- /dev/null +++ b/tests/orchestrator/workflows/core/test_task.py @@ -0,0 +1,113 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import ( + datetime, + timedelta +) + +import pytest + +from aria.orchestrator.context import workflow as workflow_context +from aria.orchestrator.workflows import ( + api, + core, + exceptions, +) + +from tests import mock + + +@pytest.fixture +def ctx(): + simple_context = mock.context.simple() + + blueprint = mock.models.get_blueprint() + deployment = mock.models.get_deployment() + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance(node) + execution = mock.models.get_execution() + + simple_context.model.blueprint.store(blueprint) + simple_context.model.deployment.store(deployment) + simple_context.model.node.store(node) + simple_context.model.node_instance.store(node_instance) + simple_context.model.execution.store(execution) + + return simple_context + + +class TestOperationTask(object): + + def _create_operation_task(self, ctx, node_instance): + with workflow_context.current.push(ctx): + api_task = api.task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.create', + ) + + core_task = core.task.OperationTask(api_task=api_task) + + return api_task, core_task + + def test_operation_task_creation(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + api_task, core_task = self._create_operation_task(ctx, node_instance) + storage_task = ctx.model.task.get(core_task.id) + + assert core_task.model_task == storage_task + assert core_task.name == api_task.name + assert core_task.operation_mapping == api_task.operation_mapping + assert core_task.actor == api_task.actor == node_instance + assert core_task.inputs == api_task.inputs == storage_task.inputs + + def test_operation_task_edit_locked_attribute(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + + _, core_task = self._create_operation_task(ctx, node_instance) + now = datetime.utcnow() + with pytest.raises(exceptions.TaskException): + core_task.status = core_task.STARTED + with pytest.raises(exceptions.TaskException): + core_task.started_at = now + with pytest.raises(exceptions.TaskException): + core_task.ended_at = now + with pytest.raises(exceptions.TaskException): + core_task.retry_count = 2 + with pytest.raises(exceptions.TaskException): + core_task.due_at = now + + def test_operation_task_edit_attributes(self, ctx): + node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID) + + _, core_task = self._create_operation_task(ctx, node_instance) + future_time = datetime.utcnow() + timedelta(seconds=3) + + with core_task._update(): + core_task.status = core_task.STARTED + core_task.started_at = future_time + core_task.ended_at = future_time + core_task.retry_count = 2 + core_task.eta = future_time + assert core_task.status != core_task.STARTED + assert core_task.started_at != future_time + assert core_task.ended_at != future_time + assert core_task.retry_count != 2 + assert core_task.due_at != future_time + + assert core_task.status == core_task.STARTED + assert core_task.started_at == future_time + assert core_task.ended_at == future_time + assert core_task.retry_count == 2 + assert core_task.eta == future_time http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py new file mode 100644 index 0000000..a179e49 --- /dev/null +++ b/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from networkx import topological_sort, DiGraph + +from aria.orchestrator import context +from aria.orchestrator.workflows import api, core + +from tests import mock + + +def test_task_graph_into_execution_graph(): + operation_name = 'aria.interfaces.lifecycle.create' + task_context = mock.context.simple() + node = mock.models.get_dependency_node() + node_instance = mock.models.get_dependency_node_instance() + deployment = mock.models.get_deployment() + execution = mock.models.get_execution() + task_context.model.node.store(node) + task_context.model.node_instance.store(node_instance) + task_context.model.deployment.store(deployment) + task_context.model.execution.store(execution) + + def sub_workflow(name, **_): + return api.task_graph.TaskGraph(name) + + with context.workflow.current.push(task_context): + test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') + simple_before_task = api.task.OperationTask.node_instance(instance=node_instance, + name=operation_name) + simple_after_task = api.task.OperationTask.node_instance(instance=node_instance, + name=operation_name) + + inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') + inner_task = api.task.OperationTask.node_instance(instance=node_instance, + name=operation_name) + inner_task_graph.add_tasks(inner_task) + + test_task_graph.add_tasks(simple_before_task) + test_task_graph.add_tasks(simple_after_task) + test_task_graph.add_tasks(inner_task_graph) + test_task_graph.add_dependency(inner_task_graph, simple_before_task) + test_task_graph.add_dependency(simple_after_task, inner_task_graph) + + # Direct check + execution_graph = DiGraph() + core.translation.build_execution_graph(task_graph=test_task_graph, + execution_graph=execution_graph) + execution_tasks = topological_sort(execution_graph) + + assert len(execution_tasks) == 7 + + expected_tasks_names = [ + '{0}-Start'.format(test_task_graph.id), + simple_before_task.id, + '{0}-Start'.format(inner_task_graph.id), + inner_task.id, + '{0}-End'.format(inner_task_graph.id), + simple_after_task.id, + '{0}-End'.format(test_task_graph.id) + ] + + assert expected_tasks_names == execution_tasks + + assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), + core.task.StartWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph), + simple_before_task) + assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), + core.task.StartSubWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph), + inner_task) + assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), + core.task.EndSubWorkflowTask) + + _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph), + simple_after_task) + assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), + core.task.EndWorkflowTask) + + +def _assert_execution_is_api_task(execution_task, api_task): + assert execution_task.id == api_task.id + assert execution_task.name == api_task.name + assert execution_task.operation_mapping == api_task.operation_mapping + assert execution_task.actor == api_task.actor + assert execution_task.inputs == api_task.inputs + + +def _get_task_by_name(task_name, graph): + return graph.node[task_name]['task'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py new file mode 100644 index 0000000..a425799 --- /dev/null +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import uuid +from contextlib import contextmanager + +import pytest +import retrying + +from aria.storage import models +from aria.orchestrator import events +from aria.orchestrator.workflows.executor import ( + thread, + multiprocess, + blocking, + # celery +) + +try: + import celery as _celery + app = _celery.Celery() + app.conf.update(CELERY_RESULT_BACKEND='amqp://') +except ImportError: + _celery = None + app = None + + +class TestExecutor(object): + + @pytest.mark.parametrize('executor_cls,executor_kwargs', [ + (thread.ThreadExecutor, {'pool_size': 1}), + (thread.ThreadExecutor, {'pool_size': 2}), + (multiprocess.MultiprocessExecutor, {'pool_size': 1}), + (multiprocess.MultiprocessExecutor, {'pool_size': 2}), + (blocking.CurrentThreadBlockingExecutor, {}), + # (celery.CeleryExecutor, {'app': app}) + ]) + def test_execute(self, executor_cls, executor_kwargs): + self.executor = executor_cls(**executor_kwargs) + expected_value = 'value' + successful_task = MockTask(mock_successful_task) + failing_task = MockTask(mock_failing_task) + task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) + + for task in [successful_task, failing_task, task_with_inputs]: + self.executor.execute(task) + + @retrying.retry(stop_max_delay=10000, wait_fixed=100) + def assertion(): + assert successful_task.states == ['start', 'success'] + assert failing_task.states == ['start', 'failure'] + assert task_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_task.exception, MockException) + assert isinstance(task_with_inputs.exception, MockException) + assert task_with_inputs.exception.message == expected_value + assertion() + + def setup_method(self): + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + + def teardown_method(self): + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler) + if hasattr(self, 'executor'): + self.executor.close() + + +def mock_successful_task(**_): + pass + + +def mock_failing_task(**_): + raise MockException + + +def mock_task_with_input(input, **_): + raise MockException(input) + +if app: + mock_successful_task = app.task(mock_successful_task) + mock_failing_task = app.task(mock_failing_task) + mock_task_with_input = app.task(mock_task_with_input) + + +class MockException(Exception): + pass + + +class MockTask(object): + + INFINITE_RETRIES = models.Task.INFINITE_RETRIES + + def __init__(self, func, inputs=None, ctx=None): + self.states = [] + self.exception = None + self.id = str(uuid.uuid4()) + name = func.__name__ + operation = 'tests.orchestrator.workflows.executor.test_executor.{name}'.format(name=name) + self.operation_mapping = operation + self.logger = logging.getLogger() + self.name = name + self.inputs = inputs or {} + self.context = ctx or None + self.retry_count = 0 + self.max_attempts = 1 + + for state in models.Task.STATES: + setattr(self, state.upper(), state) + + @contextmanager + def _update(self): + yield self + + +def start_handler(task, *args, **kwargs): + task.states.append('start') + + +def success_handler(task, *args, **kwargs): + task.states.append('success') + + +def failure_handler(task, exception, *args, **kwargs): + task.states.append('failure') + task.exception = exception http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_drivers.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_drivers.py b/tests/storage/test_drivers.py index 06e0e40..dccbe98 100644 --- a/tests/storage/test_drivers.py +++ b/tests/storage/test_drivers.py @@ -17,7 +17,7 @@ import os import pytest from aria.storage.drivers import FileSystemModelDriver, Driver, ModelDriver, ResourceDriver -from aria.exceptions import StorageError +from aria.storage.exceptions import StorageError from . import InMemoryModelDriver, TestFileSystem http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_model_storage.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_model_storage.py b/tests/storage/test_model_storage.py index a06cd5f..17e11ae 100644 --- a/tests/storage/test_model_storage.py +++ b/tests/storage/test_model_storage.py @@ -20,8 +20,8 @@ from aria.storage import ( ModelStorage, models, ) -from aria.exceptions import StorageError from aria.storage import structures +from aria.storage.exceptions import StorageError from aria.storage.structures import Model, Field, PointerField from aria import application_model_storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_models.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py index f2fce90..7e289e6 100644 --- a/tests/storage/test_models.py +++ b/tests/storage/test_models.py @@ -19,7 +19,7 @@ from datetime import datetime import pytest from aria.storage import Model, Field -from aria.exceptions import StorageError +from aria.storage.exceptions import StorageError from aria.storage.models import ( DeploymentUpdateStep, Relationship, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_models_api.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models_api.py b/tests/storage/test_models_api.py index 80826c3..2b92820 100644 --- a/tests/storage/test_models_api.py +++ b/tests/storage/test_models_api.py @@ -16,7 +16,7 @@ import pytest from aria.storage import _ModelApi, models -from aria.exceptions import StorageError +from aria.storage.exceptions import StorageError from . import InMemoryModelDriver http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_resource_storage.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_resource_storage.py b/tests/storage/test_resource_storage.py index 9673a26..918b270 100644 --- a/tests/storage/test_resource_storage.py +++ b/tests/storage/test_resource_storage.py @@ -18,7 +18,7 @@ import tempfile import pytest -from aria.exceptions import StorageError +from aria.storage.exceptions import StorageError from aria.storage import ResourceStorage, FileSystemResourceDriver from . import TestFileSystem http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/test_logger.py ---------------------------------------------------------------------- diff --git a/tests/test_logger.py b/tests/test_logger.py index 37731bb..8c7a9af 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -23,7 +23,6 @@ from aria.logger import (create_logger, LoggerMixin, _DefaultConsoleFormat) - def test_create_logger(): logger = create_logger()