http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/api/test_task_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/api/test_task_graph.py 
b/tests/orchestrator/workflows/api/test_task_graph.py
new file mode 100644
index 0000000..a569386
--- /dev/null
+++ b/tests/orchestrator/workflows/api/test_task_graph.py
@@ -0,0 +1,745 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.api import task_graph, task
+
+
+class MockTask(task.BaseTask):
+    def __init__(self):
+        super(MockTask, self).__init__(ctx={})
+
+
+@pytest.fixture
+def graph():
+    return task_graph.TaskGraph(name='mock-graph')
+
+
+class TestTaskGraphTasks(object):
+
+    def test_add_task(self, graph):
+        task = MockTask()
+        add_result = graph.add_tasks(task)
+        assert add_result == [task]
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_add_empty_group(self, graph):
+        result = graph.add_tasks([])
+        assert result == []
+
+    def test_add_group(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        added_tasks = graph.add_tasks(*tasks)
+        assert added_tasks == tasks
+
+    def test_add_partially_existing_group(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        tasks = [MockTask(), task, MockTask()]
+        added_tasks = graph.add_tasks(*tasks)
+        assert added_tasks == [tasks[0], tasks[2]]
+
+    def test_add_recursively_group(self, graph):
+        recursive_group = [MockTask(), MockTask()]
+        tasks = [MockTask(), recursive_group, MockTask()]
+        added_tasks = graph.add_tasks(tasks)
+        assert added_tasks == [tasks[0], recursive_group[0], 
recursive_group[1], tasks[2]]
+
+    def test_add_existing_task(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # adding a task already in graph - should have no effect, and return 
False
+        add_result = graph.add_tasks(task)
+        assert add_result == []
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_remove_task(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        graph.remove_tasks(other_task)
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_remove_tasks_with_dependency(self, graph):
+        task = MockTask()
+        dependent_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependent_task)
+        graph.add_dependency(dependent_task, task)
+        remove_result = graph.remove_tasks(dependent_task)
+        assert remove_result == [dependent_task]
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+        # asserting no dependencies are left for the dependent task
+        assert len(list(graph.get_dependencies(task))) == 0
+
+    def test_remove_empty_group(self, graph):
+        result = graph.remove_tasks([])
+        assert result == []
+
+    def test_remove_group(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        graph.add_tasks(*tasks)
+        removed_tasks = graph.remove_tasks(*tasks)
+        assert removed_tasks == tasks
+
+    def test_remove_partially_existing_group(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        tasks = [MockTask(), task, MockTask()]
+        removed_tasks = graph.remove_tasks(*tasks)
+        assert removed_tasks == [task]
+
+    def test_remove_recursively_group(self, graph):
+        recursive_group = [MockTask(), MockTask()]
+        tasks = [MockTask(), recursive_group, MockTask()]
+        graph.add_tasks(tasks)
+        removed_tasks = graph.remove_tasks(tasks)
+        assert removed_tasks == [tasks[0], recursive_group[0], 
recursive_group[1], tasks[2]]
+
+    def test_remove_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        # removing a task not in graph - should have no effect, and return 
False
+        remove_result = graph.remove_tasks(task_not_in_graph)
+        assert remove_result == []
+        tasks = [t for t in graph.tasks]
+        assert len(tasks) == 1
+        assert tasks[0] == task
+
+    def test_has_task(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        assert graph.has_tasks(task) is True
+
+    def test_has_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        assert graph.has_tasks(task_not_in_graph) is False
+
+    def test_has_empty_group(self, graph):
+        # the "empty task" is in the graph
+        assert graph.has_tasks([]) is True
+
+    def test_has_group(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        graph.add_tasks(*tasks)
+        assert graph.has_tasks(*tasks) is True
+
+    def test_has_partially_existing_group(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        tasks = [MockTask(), task, MockTask()]
+        assert graph.has_tasks(tasks) is False
+
+    def test_has_recursively_group(self, graph):
+        recursive_group = [MockTask(), MockTask()]
+        tasks = [MockTask(), recursive_group, MockTask()]
+        graph.add_tasks(tasks)
+        assert graph.has_tasks(tasks) is True
+
+    def test_get_task(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        assert graph.get_task(task.id) == task
+
+    def test_get_nonexistent_task(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.get_task(task_not_in_graph.id)
+
+
+class TestTaskGraphGraphTraversal(object):
+
+    def test_tasks_iteration(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        tasks = [t for t in graph.tasks]
+        assert set(tasks) == set([task, other_task])
+
+    def test_get_dependents(self, graph):
+        task = MockTask()
+        dependent_task_1 = MockTask()
+        dependent_task_2 = MockTask()
+        transitively_dependent_task = MockTask()
+
+        graph.add_tasks(task)
+        graph.add_tasks(dependent_task_1)
+        graph.add_tasks(dependent_task_2)
+        graph.add_tasks(transitively_dependent_task)
+
+        graph.add_dependency(dependent_task_1, task)
+        graph.add_dependency(dependent_task_2, task)
+        graph.add_dependency(transitively_dependent_task, dependent_task_2)
+
+        dependent_tasks = list(graph.get_dependents(task))
+        # transitively_dependent_task not expected to appear in the result
+        assert set(dependent_tasks) == set([dependent_task_1, 
dependent_task_2])
+
+    def test_get_task_empty_dependents(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        dependent_tasks = list(graph.get_dependents(task))
+        assert len(dependent_tasks) == 0
+
+    def test_get_nonexistent_task_dependents(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            list(graph.get_dependents(task_not_in_graph))
+
+    def test_get_dependencies(self, graph):
+        task = MockTask()
+        dependency_task_1 = MockTask()
+        dependency_task_2 = MockTask()
+        transitively_dependency_task = MockTask()
+
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task_1)
+        graph.add_tasks(dependency_task_2)
+        graph.add_tasks(transitively_dependency_task)
+
+        graph.add_dependency(task, dependency_task_1)
+        graph.add_dependency(task, dependency_task_2)
+        graph.add_dependency(dependency_task_2, transitively_dependency_task)
+
+        dependency_tasks = list(graph.get_dependencies(task))
+        # transitively_dependency_task not expected to appear in the result
+        assert set(dependency_tasks) == set([dependency_task_1, 
dependency_task_2])
+
+    def test_get_task_empty_dependencies(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        dependency_tasks = list(graph.get_dependencies(task))
+        assert len(dependency_tasks) == 0
+
+    def test_get_nonexistent_task_dependencies(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            list(graph.get_dependencies(task_not_in_graph))
+
+
+class TestTaskGraphDependencies(object):
+
+    def test_add_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        unrelated_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        graph.add_tasks(unrelated_task)
+        graph.add_dependency(task, dependency_task)
+        add_result = graph.has_dependency(task, dependency_task)
+        assert add_result is True
+        dependency_tasks = list(graph.get_dependencies(task))
+        assert len(dependency_tasks) == 1
+        assert dependency_tasks[0] == dependency_task
+
+    def test_add_existing_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        graph.add_dependency(task, dependency_task)
+        add_result = graph.has_dependency(task, dependency_task)
+        # adding a dependency already in graph - should have no effect, and 
return False
+        assert add_result is True
+        graph.add_dependency(task, dependency_task)
+        add_result = graph.has_dependency(task, dependency_task)
+        assert add_result is True
+        dependency_tasks = list(graph.get_dependencies(task))
+        assert len(dependency_tasks) == 1
+        assert dependency_tasks[0] == dependency_task
+
+    def test_add_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.add_dependency(task_not_in_graph, task)
+
+    def test_add_dependency_nonexistent_dependency(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.add_dependency(task, task_not_in_graph)
+
+    def test_add_dependency_empty_dependent(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting add_dependency result to be False - no dependency has been 
created
+        assert set(graph.tasks) == set((task,))
+
+    def test_add_dependency_empty_dependency(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting add_dependency result to be False - no dependency has been 
created
+        assert set(graph.tasks) == set((task,))
+
+    def test_add_dependency_dependent_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(group_tasks, task)
+        assert graph.has_dependency(group_tasks[0], task) is True
+        assert graph.has_dependency(group_tasks[1], task) is True
+        assert graph.has_dependency(group_tasks[2], task) is True
+
+    def test_add_dependency_dependency_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(task, group_tasks)
+        assert graph.has_dependency(task, group_tasks[0]) is True
+        assert graph.has_dependency(task, group_tasks[1]) is True
+        assert graph.has_dependency(task, group_tasks[2]) is True
+
+    def test_add_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        graph.add_dependency(group_1_tasks, group_2_tasks)
+        for group_2_task in group_2_tasks:
+            assert graph.has_dependency(group_1_tasks[0], group_2_task) is True
+            assert graph.has_dependency(group_1_tasks[1], group_2_task) is True
+            assert graph.has_dependency(group_1_tasks[2], group_2_task) is True
+
+    def 
test_add_dependency_dependency_group_with_some_existing_dependencies(self, 
graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        # adding a dependency on a specific task manually,
+        # before adding a dependency on the whole parallel
+        graph.add_dependency(task, group_tasks[1])
+        graph.add_dependency(task, group_tasks)
+        assert graph.has_dependency(task, group_tasks[0]) is True
+        assert graph.has_dependency(task, group_tasks[1]) is True
+        assert graph.has_dependency(task, group_tasks[2]) is True
+
+    def test_add_existing_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        graph.add_dependency(group_1_tasks, group_2_tasks)
+        add_result = graph.has_dependency(group_1_tasks, group_2_tasks)
+        assert add_result is True
+        # adding a dependency already in graph - should have no effect, and 
return False
+        graph.add_dependency(group_1_tasks, group_2_tasks)
+        add_result = graph.has_dependency(group_1_tasks, group_2_tasks)
+        assert add_result is True
+        for group_2_task in group_2_tasks:
+            assert graph.has_dependency(group_1_tasks[0], group_2_task) is True
+            assert graph.has_dependency(group_1_tasks[1], group_2_task) is True
+            assert graph.has_dependency(group_1_tasks[2], group_2_task) is True
+
+    def test_has_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        graph.add_dependency(task, dependency_task)
+        assert graph.has_dependency(task, dependency_task) is True
+
+    def test_has_nonexistent_dependency(self, graph):
+        task = MockTask()
+        other_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(other_task)
+        assert graph.has_dependency(task, other_task) is False
+
+    def test_has_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.has_dependency(task_not_in_graph, task)
+
+    def test_has_dependency_nonexistent_dependency(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.has_dependency(task, task_not_in_graph)
+
+    def test_has_dependency_empty_dependent(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting has_dependency result to be False - dependency in an empty 
form
+        assert graph.has_dependency([], task) is False
+
+    def test_has_dependency_empty_dependency(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting has_dependency result to be True - dependency in an empty 
form
+        assert graph.has_dependency(task, []) is False
+
+    def test_has_dependency_dependent_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        assert graph.has_dependency(group_tasks, task) is False
+        graph.add_dependency(group_tasks, task)
+        assert graph.has_dependency(group_tasks, task) is True
+
+    def test_has_dependency_dependency_parallel(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        assert graph.has_dependency(task, group_tasks) is False
+        graph.add_dependency(task, group_tasks)
+        assert graph.has_dependency(task, group_tasks) is True
+
+    def test_has_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        assert graph.has_dependency(group_2_tasks, group_1_tasks) is False
+        graph.add_dependency(group_2_tasks, group_1_tasks)
+        assert graph.has_dependency(group_2_tasks, group_1_tasks) is True
+
+    def 
test_has_dependency_dependency_parallel_with_some_existing_dependencies(self, 
graph):
+        task = MockTask()
+        parallel_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        parallel = graph.add_tasks(*parallel_tasks)
+        graph.add_dependency(task, parallel_tasks[1])
+        # only a partial dependency exists - has_dependency is expected to 
return False
+        assert graph.has_dependency(task, parallel) is False
+
+    def test_has_nonexistent_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        assert graph.has_dependency(group_1_tasks, group_2_tasks) is False
+
+    def test_remove_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        another_dependent_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        graph.add_tasks(another_dependent_task)
+        graph.add_dependency(task, dependency_task)
+        graph.add_dependency(another_dependent_task, dependency_task)
+
+        graph.remove_dependency(task, dependency_task)
+        remove_result = graph.has_dependency(task, dependency_task)
+        assert remove_result is False
+        assert graph.has_dependency(task, dependency_task) is False
+        assert graph.has_dependency(another_dependent_task, dependency_task) 
is True
+
+    def test_remove_nonexistent_dependency(self, graph):
+        task = MockTask()
+        dependency_task = MockTask()
+        graph.add_tasks(task)
+        graph.add_tasks(dependency_task)
+        # removing a dependency not in graph - should have no effect, and 
return False
+        graph.remove_dependency(task, dependency_task)
+        remove_result = graph.has_dependency(task, dependency_task)
+        assert remove_result is False
+        tasks = [t for t in graph.tasks]
+        assert set(tasks) == set([task, dependency_task])
+
+    def test_remove_dependency_nonexistent_dependent(self, graph):
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.remove_dependency(task_not_in_graph, task)
+
+    def test_remove_dependency_nonexistent_dependency(self, graph):
+        # in this test the dependency *task* is not in the graph, not just the 
dependency itself
+        task = MockTask()
+        task_not_in_graph = MockTask()
+        graph.add_tasks(task)
+        with pytest.raises(task_graph.TaskNotInGraphError):
+            graph.remove_dependency(task, task_not_in_graph)
+
+    def test_remove_dependency_empty_dependent(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting remove_dependency result to be False - no dependency has 
been created
+        graph.remove_dependency([], task)
+        assert set(graph.tasks) == set((task,))
+
+    def test_remove_dependency_empty_dependency(self, graph):
+        task = MockTask()
+        graph.add_tasks(task)
+        # expecting remove_dependency result to be False - no dependency has 
been created
+        graph.remove_dependency(task, [])
+        assert set(graph.tasks) == set((task,))
+
+    def test_remove_dependency_dependent_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(group_tasks, task)
+        graph.remove_dependency(group_tasks, task)
+        remove_result = graph.has_dependency(group_tasks, task)
+        assert remove_result is False
+        assert graph.has_dependency(group_tasks[0], task) is False
+        assert graph.has_dependency(group_tasks[1], task) is False
+        assert graph.has_dependency(group_tasks[2], task) is False
+
+    def test_remove_dependency_dependency_group(self, graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(task, group_tasks)
+        graph.remove_dependency(task, group_tasks)
+        remove_result = graph.has_dependency(task, group_tasks)
+        assert remove_result is False
+        assert graph.has_dependency(task, group_tasks[0]) is False
+        assert graph.has_dependency(task, group_tasks[1]) is False
+        assert graph.has_dependency(task, group_tasks[2]) is False
+
+    def test_remove_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        graph.add_dependency(group_2_tasks, group_1_tasks)
+        graph.remove_dependency(group_2_tasks, group_1_tasks)
+        remove_result = graph.has_dependency(group_2_tasks, group_1_tasks)
+        assert remove_result is False
+        for group_2_task in group_2_tasks:
+            assert graph.has_dependency(group_2_task, group_1_tasks[0]) is 
False
+            assert graph.has_dependency(group_2_task, group_1_tasks[1]) is 
False
+            assert graph.has_dependency(group_2_task, group_1_tasks[2]) is 
False
+
+    def 
test_remove_dependency_dependency_group_with_some_existing_dependencies(self, 
graph):
+        task = MockTask()
+        group_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(task)
+        graph.add_tasks(*group_tasks)
+        graph.add_dependency(task, group_tasks[1])
+        graph.remove_dependency(task, group_tasks)
+        remove_result = graph.has_dependency(task, group_tasks)
+        # only a partial dependency exists - remove_dependency is expected to 
return False
+        assert remove_result is False
+        # no dependencies are expected to have changed
+        assert graph.has_dependency(task, group_tasks[0]) is False
+        assert graph.has_dependency(task, group_tasks[1]) is True
+        assert graph.has_dependency(task, group_tasks[2]) is False
+
+    def test_remove_nonexistent_dependency_between_groups(self, graph):
+        group_1_tasks = [MockTask() for _ in xrange(3)]
+        group_2_tasks = [MockTask() for _ in xrange(3)]
+        graph.add_tasks(*group_1_tasks)
+        graph.add_tasks(*group_2_tasks)
+        # removing a dependency not in graph - should have no effect, and 
return False
+        graph.remove_dependency(group_2_tasks, group_1_tasks)
+        remove_result = graph.has_dependency(group_2_tasks, group_1_tasks)
+        assert remove_result is False
+
+    # nested tests
+
+    def test_group_with_nested_sequence(self, graph):
+        all_tasks = [MockTask() for _ in xrange(5)]
+        graph.add_tasks(all_tasks[0],
+                        graph.sequence(all_tasks[1], all_tasks[2], 
all_tasks[3]),
+                        all_tasks[4])
+        assert set(graph.tasks) == set(all_tasks)
+
+        # tasks[2] and tasks[3] should each have a single dependency; the rest 
should have none
+        assert len(list(graph.get_dependencies(all_tasks[0]))) == 0
+        assert len(list(graph.get_dependencies(all_tasks[1]))) == 0
+        assert set(graph.get_dependencies(all_tasks[2])) == set([all_tasks[1]])
+        assert set(graph.get_dependencies(all_tasks[3])) == set([all_tasks[2]])
+        assert len(list(graph.get_dependencies(all_tasks[4]))) == 0
+
+    def test_group_with_nested_group(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.add_tasks(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # none of the tasks should have any dependencies
+        for i in xrange(len(tasks)):
+            assert len(list(graph.get_dependencies(tasks[i]))) == 0
+
+    def test_group_with_recursively_nested_group(self, graph):
+        recursively_nested_tasks = [MockTask(), MockTask(), MockTask()]
+        nested_tasks = [MockTask(), MockTask(), MockTask(), 
recursively_nested_tasks]
+        tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+        graph.add_tasks(*tasks)
+
+        assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + 
recursively_nested_tasks)
+        for tasks_list in [tasks, nested_tasks, recursively_nested_tasks]:
+            for i in xrange(len(tasks_list[:3])):
+                assert len(list(graph.get_dependencies(tasks_list[i]))) == 0
+
+    def test_group_with_recursively_nested_group_and_interdependencies(self, 
graph):
+        recursively_nested_tasks = [MockTask(), MockTask(), MockTask()]
+        nested_tasks = [MockTask(), MockTask(), MockTask(), 
recursively_nested_tasks]
+        tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+        graph.add_tasks(*tasks)
+
+        graph.add_dependency(tasks[2], nested_tasks[2])
+        graph.add_dependency(nested_tasks[1], recursively_nested_tasks[0])
+        graph.add_dependency(recursively_nested_tasks[1], tasks[0])
+
+        assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + 
recursively_nested_tasks)
+        assert set(graph.get_dependencies(tasks[0])) == set()
+        assert set(graph.get_dependencies(tasks[1])) == set()
+        assert set(graph.get_dependencies(tasks[2])) == set([nested_tasks[2]])
+
+        assert set(graph.get_dependencies(nested_tasks[0])) == set()
+        assert set(graph.get_dependencies(nested_tasks[1])) == 
set([recursively_nested_tasks[0]])
+        assert set(graph.get_dependencies(nested_tasks[2])) == set()
+
+        assert set(graph.get_dependencies(recursively_nested_tasks[0])) == 
set()
+        assert set(graph.get_dependencies(recursively_nested_tasks[1])) == 
set([tasks[0]])
+        assert set(graph.get_dependencies(recursively_nested_tasks[2])) == 
set()
+
+
+class TestTaskGraphSequence(object):
+
+    def test_sequence(self, graph):
+        tasks = [MockTask(), MockTask(), MockTask()]
+        graph.sequence(*tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        assert len(list(graph.get_dependencies(tasks[0]))) == 0
+        assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]])
+        assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]])
+
+    def test_sequence_with_some_tasks_and_dependencies_already_in_graph(self, 
graph):
+        # tests both that tasks which werent previously in graph get inserted, 
and
+        # that existing tasks don't get re-added to graph
+        tasks = [MockTask(), MockTask(), MockTask()]
+        # insert some tasks and dependencies to the graph
+        graph.add_tasks(tasks[1])
+        graph.add_tasks(tasks[2])
+        graph.add_dependency(tasks[2], tasks[1])
+
+        graph.sequence(*tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        assert len(list(graph.get_dependencies(tasks[0]))) == 0
+        assert set(graph.get_dependencies(tasks[1])) == set([tasks[0]])
+        assert set(graph.get_dependencies(tasks[2])) == set([tasks[1]])
+
+    def test_sequence_with_nested_sequence(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.sequence(tasks[0], graph.sequence(tasks[1], tasks[2], tasks[3]), 
tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # first task should have no dependencies
+        assert len(list(graph.get_dependencies(tasks[0]))) == 0
+        assert len(list(graph.get_dependencies(tasks[1]))) == 1
+        assert len(list(graph.get_dependencies(tasks[2]))) == 2
+        assert len(list(graph.get_dependencies(tasks[3]))) == 2
+        assert len(list(graph.get_dependencies(tasks[4]))) == 3
+
+    def test_sequence_with_nested_group(self, graph):
+        tasks = [MockTask() for _ in xrange(5)]
+        graph.sequence(tasks[0], (tasks[1], tasks[2], tasks[3]), tasks[4])
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set(tasks)
+        # first task should have no dependencies
+        assert len(list(graph.get_dependencies(tasks[0]))) == 0
+        # rest of the tasks (except last) should have a single dependency - 
the first task
+        for i in xrange(1, 4):
+            assert set(graph.get_dependencies(tasks[i])) == set([tasks[0]])
+        # last task should have have a dependency on all tasks except for the 
first one
+        assert set(graph.get_dependencies(tasks[4])) == set([tasks[1], 
tasks[2], tasks[3]])
+
+    def test_sequence_with_recursively_nested_group(self, graph):
+        recursively_nested_group = [MockTask(), MockTask()]
+        nested_group = [MockTask(), recursively_nested_group, MockTask()]
+        sequence_tasks = [MockTask(), nested_group, MockTask()]
+
+        graph.sequence(*sequence_tasks)
+        graph_tasks = [t for t in graph.tasks]
+        assert set(graph_tasks) == set([sequence_tasks[0], nested_group[0],
+                                        recursively_nested_group[0], 
recursively_nested_group[1],
+                                        nested_group[2], sequence_tasks[2]])
+
+        assert list(graph.get_dependencies(nested_group[0])) == 
[sequence_tasks[0]]
+        assert list(graph.get_dependencies(recursively_nested_group[0])) == 
[sequence_tasks[0]]
+        assert list(graph.get_dependencies(recursively_nested_group[1])) == 
[sequence_tasks[0]]
+        assert list(graph.get_dependencies(nested_group[2])) == 
[sequence_tasks[0]]
+
+        assert list(graph.get_dependents(nested_group[0])) == 
[sequence_tasks[2]]
+        assert list(graph.get_dependents(recursively_nested_group[0])) == 
[sequence_tasks[2]]
+        assert list(graph.get_dependents(recursively_nested_group[1])) == 
[sequence_tasks[2]]
+        assert list(graph.get_dependents(nested_group[2])) == 
[sequence_tasks[2]]
+
+    def test_sequence_with_empty_group(self, graph):
+        tasks = [MockTask(), [], MockTask()]
+        graph.sequence(*tasks)
+        graph_tasks = set([t for t in graph.tasks])
+        assert graph_tasks == set([tasks[0], tasks[2]])
+        assert list(graph.get_dependents(tasks[0])) == [tasks[2]]
+        assert list(graph.get_dependencies(tasks[2])) == [tasks[0]]
+
+    def 
test_sequence_with_recursively_nested_sequence_and_interdependencies(self, 
graph):
+        recursively_nested_tasks = list(graph.sequence(MockTask(), MockTask(), 
MockTask()))
+        nested_tasks = list(graph.sequence(MockTask(),
+                                           MockTask(),
+                                           MockTask(),
+                                           recursively_nested_tasks))
+        tasks = [MockTask(), MockTask(), MockTask(), nested_tasks]
+        graph.sequence(*tasks)
+
+        assert set(graph.tasks) == set(tasks[:3] + nested_tasks[:3] + 
recursively_nested_tasks)
+        assert set(graph.get_dependencies(tasks[0])) == set()
+        for i in xrange(1, len(tasks[:-1])):
+            assert set(graph.get_dependencies(tasks[i])) == set([tasks[i - 1]])
+
+        assert set(graph.get_dependencies(nested_tasks[0])) == set([tasks[2]])
+        for i in xrange(1, len(nested_tasks[:-1])):
+            assert set(graph.get_dependencies(nested_tasks[i])) == \
+                   set([tasks[2], nested_tasks[i-1]])
+
+        assert set(graph.get_dependencies(recursively_nested_tasks[0])) == \
+               set([tasks[2], nested_tasks[2]])
+        for i in xrange(1, len(recursively_nested_tasks[:-1])):
+            assert set(graph.get_dependencies(recursively_nested_tasks[i])) == 
\
+                   set([tasks[2], nested_tasks[2], 
recursively_nested_tasks[i-1]])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/__init__.py 
b/tests/orchestrator/workflows/builtin/__init__.py
new file mode 100644
index 0000000..e100432
--- /dev/null
+++ b/tests/orchestrator/workflows/builtin/__init__.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from tests import mock
+
+def assert_node_install_operations(operations, with_relationships=False):
+    if with_relationships:
+        all_operations = [
+            'aria.interfaces.lifecycle.create',
+            'aria.interfaces.relationship_lifecycle.preconfigure',
+            'aria.interfaces.relationship_lifecycle.preconfigure',
+            'aria.interfaces.lifecycle.configure',
+            'aria.interfaces.relationship_lifecycle.postconfigure',
+            'aria.interfaces.relationship_lifecycle.postconfigure',
+            'aria.interfaces.lifecycle.start',
+            'aria.interfaces.relationship_lifecycle.establish',
+            'aria.interfaces.relationship_lifecycle.establish',
+        ]
+
+        for i, operation in enumerate(operations):
+            assert operation.name.startswith(all_operations[i])
+    else:
+        for i, operation in enumerate(operations):
+            assert 
operation.name.startswith(mock.operations.NODE_OPERATIONS_INSTALL[i])
+
+
+def assert_node_uninstall_operations(operations, with_relationships=False):
+    if with_relationships:
+        all_operations = [
+            'aria.interfaces.lifecycle.stop',
+            'aria.interfaces.relationship_lifecycle.unlink',
+            'aria.interfaces.relationship_lifecycle.unlink',
+            'aria.interfaces.lifecycle.delete',
+        ]
+
+        for i, operation in enumerate(operations):
+            assert operation.name.startswith(all_operations[i])
+    else:
+        for i, operation in enumerate(operations):
+            assert 
operation.name.startswith(mock.operations.NODE_OPERATIONS_UNINSTALL[i])
+
+
+def ctx_with_basic_graph():
+    """
+    Create the following graph in storage:
+    dependency_node <------ dependent_node
+    :return:
+    """
+    simple_context = mock.context.simple()
+    dependency_node = mock.models.get_dependency_node()
+    dependency_node_instance = mock.models.get_dependency_node_instance(
+        dependency_node=dependency_node)
+
+    relationship = mock.models.get_relationship(dependency_node)
+    relationship_instance = mock.models.get_relationship_instance(
+        relationship=relationship,
+        target_instance=dependency_node_instance
+    )
+
+    dependent_node = mock.models.get_dependent_node(relationship)
+    dependent_node_instance = mock.models.get_dependent_node_instance(
+        dependent_node=dependent_node,
+        relationship_instance=relationship_instance
+    )
+
+    simple_context.model.node.store(dependent_node)
+    simple_context.model.node.store(dependency_node)
+    simple_context.model.node_instance.store(dependent_node_instance)
+    simple_context.model.node_instance.store(dependency_node_instance)
+    simple_context.model.relationship.store(relationship)
+    simple_context.model.relationship_instance.store(relationship_instance)
+
+    return simple_context

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/test_execute_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py 
b/tests/orchestrator/workflows/builtin/test_execute_operation.py
new file mode 100644
index 0000000..83e0d4d
--- /dev/null
+++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.api import task
+from aria.orchestrator.workflows.builtin.execute_operation import 
execute_operation
+
+from tests import mock
+from . import ctx_with_basic_graph
+
+
+@pytest.fixture
+def ctx():
+    return ctx_with_basic_graph()
+
+
+def test_execute_operation(ctx):
+    operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+    node_instance_id = 'dependency_node_instance'
+
+    execute_tasks = list(
+        task.WorkflowTask(
+            execute_operation,
+            ctx=ctx,
+            operation=operation_name,
+            operation_kwargs={},
+            allow_kwargs_override=False,
+            run_by_dependency_order=False,
+            type_names=[],
+            node_ids=[],
+            node_instance_ids=[node_instance_id]
+        ).topological_order()
+    )
+
+    assert len(execute_tasks) == 1
+    assert execute_tasks[0].name == '{0}.{1}'.format(operation_name, 
node_instance_id)
+
+# TODO: add more scenarios

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/test_heal.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_heal.py 
b/tests/orchestrator/workflows/builtin/test_heal.py
new file mode 100644
index 0000000..940194b
--- /dev/null
+++ b/tests/orchestrator/workflows/builtin/test_heal.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.api import task
+from aria.orchestrator.workflows.builtin.heal import heal
+
+from . import (assert_node_install_operations,
+               assert_node_uninstall_operations,
+               ctx_with_basic_graph)
+
+
+@pytest.fixture
+def ctx():
+    return ctx_with_basic_graph()
+
+
+def test_heal_dependent_node(ctx):
+    heal_graph = task.WorkflowTask(heal, ctx=ctx, 
node_instance_id='dependent_node_instance')
+
+    assert len(list(heal_graph.tasks)) == 2
+    uninstall_subgraph, install_subgraph = 
list(heal_graph.topological_order(reverse=True))
+
+    assert len(list(uninstall_subgraph.tasks)) == 2
+    dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = \
+        list(uninstall_subgraph.topological_order(reverse=True))
+
+    assert len(list(install_subgraph.tasks)) == 2
+    dependency_node_subgraph_install, dependent_node_subgraph_install = \
+        list(install_subgraph.topological_order(reverse=True))
+
+    dependent_node_uninstall_tasks = \
+        list(dependent_node_subgraph_uninstall.topological_order(reverse=True))
+    assert isinstance(dependency_node_subgraph_uninstall, task.StubTask)
+    dependent_node_install_tasks = \
+        list(dependent_node_subgraph_install.topological_order(reverse=True))
+    assert isinstance(dependency_node_subgraph_install, task.StubTask)
+
+    assert_node_uninstall_operations(dependent_node_uninstall_tasks, 
with_relationships=True)
+    assert_node_install_operations(dependent_node_install_tasks, 
with_relationships=True)
+
+
+def test_heal_dependency_node(ctx):
+    heal_graph = task.WorkflowTask(heal, ctx=ctx, 
node_instance_id='dependency_node_instance')
+    # both subgraphs should contain un\install for both the dependent and the 
dependency
+    assert len(list(heal_graph.tasks)) == 2
+    uninstall_subgraph, install_subgraph = 
list(heal_graph.topological_order(reverse=True))
+
+    uninstall_tasks = list(uninstall_subgraph.topological_order(reverse=True))
+    assert len(uninstall_tasks) == 4
+    unlink_source, unlink_target = uninstall_tasks[:2]
+    dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = 
uninstall_tasks[2:]
+
+    install_tasks = list(install_subgraph.topological_order(reverse=True))
+    assert len(install_tasks) == 4
+    dependency_node_subgraph_install, dependent_node_subgraph_install = 
install_tasks[:2]
+    establish_source, establish_target = install_tasks[2:]
+
+    assert isinstance(dependent_node_subgraph_uninstall, task.StubTask)
+    dependency_node_uninstall_tasks = \
+        
list(dependency_node_subgraph_uninstall.topological_order(reverse=True))
+    assert isinstance(dependent_node_subgraph_install, task.StubTask)
+    dependency_node_install_tasks = \
+        list(dependency_node_subgraph_install.topological_order(reverse=True))
+
+    assert 
unlink_source.name.startswith('aria.interfaces.relationship_lifecycle.unlink')
+    assert 
unlink_target.name.startswith('aria.interfaces.relationship_lifecycle.unlink')
+    assert_node_uninstall_operations(dependency_node_uninstall_tasks)
+
+    assert_node_install_operations(dependency_node_install_tasks)
+    assert 
establish_source.name.startswith('aria.interfaces.relationship_lifecycle.establish')
+    assert 
establish_target.name.startswith('aria.interfaces.relationship_lifecycle.establish')
+
+
+# TODO: add tests for contained in scenario

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/test_install.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_install.py 
b/tests/orchestrator/workflows/builtin/test_install.py
new file mode 100644
index 0000000..3b23c5a
--- /dev/null
+++ b/tests/orchestrator/workflows/builtin/test_install.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.builtin.install import install
+from aria.orchestrator.workflows.api import task
+
+from . import (assert_node_install_operations,
+               ctx_with_basic_graph)
+
+
+@pytest.fixture
+def ctx():
+    return ctx_with_basic_graph()
+
+
+def test_install(ctx):
+    install_tasks = list(task.WorkflowTask(install, 
ctx=ctx).topological_order(True))
+
+    assert len(install_tasks) == 2
+    dependency_node_subgraph, dependent_node_subgraph = install_tasks
+    dependent_node_tasks = 
list(dependent_node_subgraph.topological_order(reverse=True))
+    dependency_node_tasks = 
list(dependency_node_subgraph.topological_order(reverse=True))
+
+    assert_node_install_operations(dependency_node_tasks)
+    assert_node_install_operations(dependent_node_tasks, 
with_relationships=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/builtin/test_uninstall.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/builtin/test_uninstall.py 
b/tests/orchestrator/workflows/builtin/test_uninstall.py
new file mode 100644
index 0000000..889e1d2
--- /dev/null
+++ b/tests/orchestrator/workflows/builtin/test_uninstall.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows.api import task
+from aria.orchestrator.workflows.builtin.uninstall import uninstall
+
+from . import (assert_node_uninstall_operations,
+               ctx_with_basic_graph)
+
+
+@pytest.fixture
+def ctx():
+    return ctx_with_basic_graph()
+
+
+def test_uninstall(ctx):
+    uninstall_tasks = list(task.WorkflowTask(uninstall, 
ctx=ctx).topological_order(True))
+
+    assert len(uninstall_tasks) == 2
+    dependent_node_subgraph, dependency_node_subgraph = uninstall_tasks
+    dependent_node_tasks = 
list(dependent_node_subgraph.topological_order(reverse=True))
+    dependency_node_tasks = 
list(dependency_node_subgraph.topological_order(reverse=True))
+
+    assert_node_uninstall_operations(operations=dependency_node_tasks)
+    assert_node_uninstall_operations(operations=dependent_node_tasks, 
with_relationships=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/__init__.py 
b/tests/orchestrator/workflows/core/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/orchestrator/workflows/core/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py 
b/tests/orchestrator/workflows/core/test_engine.py
new file mode 100644
index 0000000..1b00bf6
--- /dev/null
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -0,0 +1,433 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import threading
+from datetime import datetime
+
+import pytest
+
+import aria
+from aria.orchestrator import (
+    events,
+    workflow,
+    operation,
+    context
+)
+from aria.storage import models
+from aria.orchestrator.workflows import (
+    api,
+    exceptions,
+)
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import thread
+
+
+import tests.storage
+from tests import mock
+
+
+global_test_holder = {}
+
+
+class BaseTest(object):
+
+    @classmethod
+    def _execute(cls, workflow_func, workflow_context, executor):
+        eng = cls._engine(workflow_func=workflow_func,
+                          workflow_context=workflow_context,
+                          executor=executor)
+        eng.execute()
+        return eng
+
+    @staticmethod
+    def _engine(workflow_func, workflow_context, executor):
+        graph = workflow_func(ctx=workflow_context)
+        return engine.Engine(executor=executor,
+                             workflow_context=workflow_context,
+                             tasks_graph=graph)
+
+    @staticmethod
+    def _op(func, ctx,
+            inputs=None,
+            max_attempts=None,
+            retry_interval=None,
+            ignore_failure=None):
+        node_instance = ctx.model.node_instance.get('dependency_node_instance')
+        node_instance.node.operations['aria.interfaces.lifecycle.create'] = {
+            'operation': '{name}.{func.__name__}'.format(name=__name__, 
func=func)
+        }
+        ctx.model.node_instance.store(node_instance)
+        return api.task.OperationTask.node_instance(
+            instance=node_instance,
+            name='aria.interfaces.lifecycle.create',
+            inputs=inputs,
+            max_attempts=max_attempts,
+            retry_interval=retry_interval,
+            ignore_failure=ignore_failure
+        )
+
+    @pytest.fixture(scope='function', autouse=True)
+    def globals_cleanup(self):
+        try:
+            yield
+        finally:
+            global_test_holder.clear()
+
+    @pytest.fixture(scope='function', autouse=True)
+    def signals_registration(self, ):
+        def sent_task_handler(*args, **kwargs):
+            calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
+            global_test_holder['sent_task_signal_calls'] = calls + 1
+
+        def start_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('start')
+
+        def success_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('success')
+
+        def failure_workflow_handler(workflow_context, exception, *args, 
**kwargs):
+            workflow_context.states.append('failure')
+            workflow_context.exception = exception
+
+        def cancel_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('cancel')
+
+        events.start_workflow_signal.connect(start_workflow_handler)
+        events.on_success_workflow_signal.connect(success_workflow_handler)
+        events.on_failure_workflow_signal.connect(failure_workflow_handler)
+        events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
+        events.sent_task_signal.connect(sent_task_handler)
+        try:
+            yield
+        finally:
+            events.start_workflow_signal.disconnect(start_workflow_handler)
+            
events.on_success_workflow_signal.disconnect(success_workflow_handler)
+            
events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+            
events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
+            events.sent_task_signal.disconnect(sent_task_handler)
+
+    @pytest.fixture(scope='function')
+    def executor(self):
+        result = thread.ThreadExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
+
+    @pytest.fixture(scope='function')
+    def workflow_context(self):
+        model_storage = 
aria.application_model_storage(tests.storage.InMemoryModelDriver())
+        model_storage.setup()
+        blueprint = mock.models.get_blueprint()
+        deployment = mock.models.get_deployment()
+        model_storage.blueprint.store(blueprint)
+        model_storage.deployment.store(deployment)
+        node = mock.models.get_dependency_node()
+        node_instance = mock.models.get_dependency_node_instance(node)
+        model_storage.node.store(node)
+        model_storage.node_instance.store(node_instance)
+        result = context.workflow.WorkflowContext(
+            name='test',
+            model_storage=model_storage,
+            resource_storage=None,
+            deployment_id=deployment.id,
+            workflow_id='name')
+        result.states = []
+        result.exception = None
+        return result
+
+
+class TestEngine(BaseTest):
+
+    def test_empty_graph_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(**_):
+            pass
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert 'sent_task_signal_calls' not in global_test_holder
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is None
+        assert execution.status == models.Execution.TERMINATED
+
+    def test_single_task_successful_execution(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(mock_success_task, ctx))
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+    def test_single_task_failed_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(mock_failed_task, ctx))
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is not None
+        assert execution.status == models.Execution.FAILED
+
+    def test_two_tasks_execution_order(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
+            op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+            graph.sequence(op1, op2)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+        @workflow
+        def sub_workflow(ctx, graph):
+            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
+            op2 = api.task.StubTask()
+            op3 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+            graph.sequence(op1, op2, op3)
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+
+class TestCancel(BaseTest):
+
+    def test_cancel_started_execution(self, workflow_context, executor):
+        number_of_tasks = 100
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            return graph.sequence(*(self._op(mock_sleep_task, ctx, 
inputs={'seconds': 0.1})
+                                    for _ in range(number_of_tasks)))
+        eng = self._engine(workflow_func=mock_workflow,
+                           workflow_context=workflow_context,
+                           executor=executor)
+        t = threading.Thread(target=eng.execute)
+        t.start()
+        time.sleep(1)
+        eng.cancel_execution()
+        t.join(timeout=30)
+        assert workflow_context.states == ['start', 'cancel']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert 0 < len(invocations) < number_of_tasks
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is None
+        assert execution.status == models.Execution.CANCELLED
+
+    def test_cancel_pending_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(graph, **_):
+            return graph
+        eng = self._engine(workflow_func=mock_workflow,
+                           workflow_context=workflow_context,
+                           executor=executor)
+        eng.cancel_execution()
+        execution = workflow_context.execution
+        assert execution.status == models.Execution.CANCELLED
+
+
+class TestRetries(BaseTest):
+
+    def test_two_max_attempts_and_success_on_retry(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'failure_count': 1},
+                          max_attempts=2)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_two_max_attempts_and_failure_on_retry(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'failure_count': 2},
+                          max_attempts=2)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_three_max_attempts_and_success_on_first_retry(self, 
workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'failure_count': 1},
+                          max_attempts=3)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_three_max_attempts_and_success_on_second_retry(self, 
workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'failure_count': 2},
+                          max_attempts=3)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 3
+        assert global_test_holder.get('sent_task_signal_calls') == 3
+
+    def test_infinite_retries(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'failure_count': 1},
+                          max_attempts=-1)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_retry_interval_float(self, workflow_context, executor):
+        self._test_retry_interval(retry_interval=0.3,
+                                  workflow_context=workflow_context,
+                                  executor=executor)
+
+    def test_retry_interval_int(self, workflow_context, executor):
+        self._test_retry_interval(retry_interval=1,
+                                  workflow_context=workflow_context,
+                                  executor=executor)
+
+    def _test_retry_interval(self, retry_interval, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'failure_count': 1},
+                          max_attempts=2,
+                          retry_interval=retry_interval)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        invocation1, invocation2 = invocations
+        assert invocation2 - invocation1 >= retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_ignore_failure(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          ignore_failure=True,
+                          inputs={'failure_count': 100},
+                          max_attempts=100)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 1
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+@operation
+def mock_success_task(**_):
+    pass
+
+
+@operation
+def mock_failed_task(**_):
+    raise RuntimeError
+
+
+@operation
+def mock_ordered_task(counter, **_):
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(counter)
+
+
+@operation
+def mock_conditional_failure_task(failure_count, **_):
+    invocations = global_test_holder.setdefault('invocations', [])
+    try:
+        if len(invocations) < failure_count:
+            raise RuntimeError
+    finally:
+        invocations.append(time.time())
+
+
+def mock_sleep_task(seconds, **_):
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(time.time())
+    time.sleep(seconds)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py 
b/tests/orchestrator/workflows/core/test_task.py
new file mode 100644
index 0000000..6a4c8ac
--- /dev/null
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import (
+    datetime,
+    timedelta
+)
+
+import pytest
+
+from aria.orchestrator.context import workflow as workflow_context
+from aria.orchestrator.workflows import (
+    api,
+    core,
+    exceptions,
+)
+
+from tests import mock
+
+
+@pytest.fixture
+def ctx():
+    simple_context = mock.context.simple()
+
+    blueprint = mock.models.get_blueprint()
+    deployment = mock.models.get_deployment()
+    node = mock.models.get_dependency_node()
+    node_instance = mock.models.get_dependency_node_instance(node)
+    execution = mock.models.get_execution()
+
+    simple_context.model.blueprint.store(blueprint)
+    simple_context.model.deployment.store(deployment)
+    simple_context.model.node.store(node)
+    simple_context.model.node_instance.store(node_instance)
+    simple_context.model.execution.store(execution)
+
+    return simple_context
+
+
+class TestOperationTask(object):
+
+    def _create_operation_task(self, ctx, node_instance):
+        with workflow_context.current.push(ctx):
+            api_task = api.task.OperationTask.node_instance(
+                instance=node_instance,
+                name='aria.interfaces.lifecycle.create',
+            )
+
+            core_task = core.task.OperationTask(api_task=api_task)
+
+        return api_task, core_task
+
+    def test_operation_task_creation(self, ctx):
+        node_instance = 
ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+        api_task, core_task = self._create_operation_task(ctx, node_instance)
+        storage_task = ctx.model.task.get(core_task.id)
+
+        assert core_task.model_task == storage_task
+        assert core_task.name == api_task.name
+        assert core_task.operation_mapping == api_task.operation_mapping
+        assert core_task.actor == api_task.actor == node_instance
+        assert core_task.inputs == api_task.inputs == storage_task.inputs
+
+    def test_operation_task_edit_locked_attribute(self, ctx):
+        node_instance = 
ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+
+        _, core_task = self._create_operation_task(ctx, node_instance)
+        now = datetime.utcnow()
+        with pytest.raises(exceptions.TaskException):
+            core_task.status = core_task.STARTED
+        with pytest.raises(exceptions.TaskException):
+            core_task.started_at = now
+        with pytest.raises(exceptions.TaskException):
+            core_task.ended_at = now
+        with pytest.raises(exceptions.TaskException):
+            core_task.retry_count = 2
+        with pytest.raises(exceptions.TaskException):
+            core_task.due_at = now
+
+    def test_operation_task_edit_attributes(self, ctx):
+        node_instance = 
ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+
+        _, core_task = self._create_operation_task(ctx, node_instance)
+        future_time = datetime.utcnow() + timedelta(seconds=3)
+
+        with core_task._update():
+            core_task.status = core_task.STARTED
+            core_task.started_at = future_time
+            core_task.ended_at = future_time
+            core_task.retry_count = 2
+            core_task.eta = future_time
+            assert core_task.status != core_task.STARTED
+            assert core_task.started_at != future_time
+            assert core_task.ended_at != future_time
+            assert core_task.retry_count != 2
+            assert core_task.due_at != future_time
+
+        assert core_task.status == core_task.STARTED
+        assert core_task.started_at == future_time
+        assert core_task.ended_at == future_time
+        assert core_task.retry_count == 2
+        assert core_task.eta == future_time

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py 
b/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py
new file mode 100644
index 0000000..a179e49
--- /dev/null
+++ 
b/tests/orchestrator/workflows/core/test_task_graph_into_exececution_graph.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from networkx import topological_sort, DiGraph
+
+from aria.orchestrator import context
+from aria.orchestrator.workflows import api, core
+
+from tests import mock
+
+
+def test_task_graph_into_execution_graph():
+    operation_name = 'aria.interfaces.lifecycle.create'
+    task_context = mock.context.simple()
+    node = mock.models.get_dependency_node()
+    node_instance = mock.models.get_dependency_node_instance()
+    deployment = mock.models.get_deployment()
+    execution = mock.models.get_execution()
+    task_context.model.node.store(node)
+    task_context.model.node_instance.store(node_instance)
+    task_context.model.deployment.store(deployment)
+    task_context.model.execution.store(execution)
+
+    def sub_workflow(name, **_):
+        return api.task_graph.TaskGraph(name)
+
+    with context.workflow.current.push(task_context):
+        test_task_graph = api.task.WorkflowTask(sub_workflow, 
name='test_task_graph')
+        simple_before_task = 
api.task.OperationTask.node_instance(instance=node_instance,
+                                                                  
name=operation_name)
+        simple_after_task = 
api.task.OperationTask.node_instance(instance=node_instance,
+                                                                 
name=operation_name)
+
+        inner_task_graph = api.task.WorkflowTask(sub_workflow, 
name='test_inner_task_graph')
+        inner_task = 
api.task.OperationTask.node_instance(instance=node_instance,
+                                                          name=operation_name)
+        inner_task_graph.add_tasks(inner_task)
+
+    test_task_graph.add_tasks(simple_before_task)
+    test_task_graph.add_tasks(simple_after_task)
+    test_task_graph.add_tasks(inner_task_graph)
+    test_task_graph.add_dependency(inner_task_graph, simple_before_task)
+    test_task_graph.add_dependency(simple_after_task, inner_task_graph)
+
+    # Direct check
+    execution_graph = DiGraph()
+    core.translation.build_execution_graph(task_graph=test_task_graph,
+                                           execution_graph=execution_graph)
+    execution_tasks = topological_sort(execution_graph)
+
+    assert len(execution_tasks) == 7
+
+    expected_tasks_names = [
+        '{0}-Start'.format(test_task_graph.id),
+        simple_before_task.id,
+        '{0}-Start'.format(inner_task_graph.id),
+        inner_task.id,
+        '{0}-End'.format(inner_task_graph.id),
+        simple_after_task.id,
+        '{0}-End'.format(test_task_graph.id)
+    ]
+
+    assert expected_tasks_names == execution_tasks
+
+    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
+                      core.task.StartWorkflowTask)
+
+    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], 
execution_graph),
+                                  simple_before_task)
+    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
+                      core.task.StartSubWorkflowTask)
+
+    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], 
execution_graph),
+                                  inner_task)
+    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
+                      core.task.EndSubWorkflowTask)
+
+    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], 
execution_graph),
+                                  simple_after_task)
+    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
+                      core.task.EndWorkflowTask)
+
+
+def _assert_execution_is_api_task(execution_task, api_task):
+    assert execution_task.id == api_task.id
+    assert execution_task.name == api_task.name
+    assert execution_task.operation_mapping == api_task.operation_mapping
+    assert execution_task.actor == api_task.actor
+    assert execution_task.inputs == api_task.inputs
+
+
+def _get_task_by_name(task_name, graph):
+    return graph.node[task_name]['task']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py 
b/tests/orchestrator/workflows/executor/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py 
b/tests/orchestrator/workflows/executor/test_executor.py
new file mode 100644
index 0000000..a425799
--- /dev/null
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import uuid
+from contextlib import contextmanager
+
+import pytest
+import retrying
+
+from aria.storage import models
+from aria.orchestrator import events
+from aria.orchestrator.workflows.executor import (
+    thread,
+    multiprocess,
+    blocking,
+    # celery
+)
+
+try:
+    import celery as _celery
+    app = _celery.Celery()
+    app.conf.update(CELERY_RESULT_BACKEND='amqp://')
+except ImportError:
+    _celery = None
+    app = None
+
+
+class TestExecutor(object):
+
+    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
+        (thread.ThreadExecutor, {'pool_size': 1}),
+        (thread.ThreadExecutor, {'pool_size': 2}),
+        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+        (blocking.CurrentThreadBlockingExecutor, {}),
+        # (celery.CeleryExecutor, {'app': app})
+    ])
+    def test_execute(self, executor_cls, executor_kwargs):
+        self.executor = executor_cls(**executor_kwargs)
+        expected_value = 'value'
+        successful_task = MockTask(mock_successful_task)
+        failing_task = MockTask(mock_failing_task)
+        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': 
expected_value})
+
+        for task in [successful_task, failing_task, task_with_inputs]:
+            self.executor.execute(task)
+
+        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+        def assertion():
+            assert successful_task.states == ['start', 'success']
+            assert failing_task.states == ['start', 'failure']
+            assert task_with_inputs.states == ['start', 'failure']
+            assert isinstance(failing_task.exception, MockException)
+            assert isinstance(task_with_inputs.exception, MockException)
+            assert task_with_inputs.exception.message == expected_value
+        assertion()
+
+    def setup_method(self):
+        events.start_task_signal.connect(start_handler)
+        events.on_success_task_signal.connect(success_handler)
+        events.on_failure_task_signal.connect(failure_handler)
+
+    def teardown_method(self):
+        events.start_task_signal.disconnect(start_handler)
+        events.on_success_task_signal.disconnect(success_handler)
+        events.on_failure_task_signal.disconnect(failure_handler)
+        if hasattr(self, 'executor'):
+            self.executor.close()
+
+
+def mock_successful_task(**_):
+    pass
+
+
+def mock_failing_task(**_):
+    raise MockException
+
+
+def mock_task_with_input(input, **_):
+    raise MockException(input)
+
+if app:
+    mock_successful_task = app.task(mock_successful_task)
+    mock_failing_task = app.task(mock_failing_task)
+    mock_task_with_input = app.task(mock_task_with_input)
+
+
+class MockException(Exception):
+    pass
+
+
+class MockTask(object):
+
+    INFINITE_RETRIES = models.Task.INFINITE_RETRIES
+
+    def __init__(self, func, inputs=None, ctx=None):
+        self.states = []
+        self.exception = None
+        self.id = str(uuid.uuid4())
+        name = func.__name__
+        operation = 
'tests.orchestrator.workflows.executor.test_executor.{name}'.format(name=name)
+        self.operation_mapping = operation
+        self.logger = logging.getLogger()
+        self.name = name
+        self.inputs = inputs or {}
+        self.context = ctx or None
+        self.retry_count = 0
+        self.max_attempts = 1
+
+        for state in models.Task.STATES:
+            setattr(self, state.upper(), state)
+
+    @contextmanager
+    def _update(self):
+        yield self
+
+
+def start_handler(task, *args, **kwargs):
+    task.states.append('start')
+
+
+def success_handler(task, *args, **kwargs):
+    task.states.append('success')
+
+
+def failure_handler(task, exception, *args, **kwargs):
+    task.states.append('failure')
+    task.exception = exception

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_drivers.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_drivers.py b/tests/storage/test_drivers.py
index 06e0e40..dccbe98 100644
--- a/tests/storage/test_drivers.py
+++ b/tests/storage/test_drivers.py
@@ -17,7 +17,7 @@ import os
 import pytest
 
 from aria.storage.drivers import FileSystemModelDriver, Driver, ModelDriver, 
ResourceDriver
-from aria.exceptions import StorageError
+from aria.storage.exceptions import StorageError
 
 from . import InMemoryModelDriver, TestFileSystem
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_model_storage.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_model_storage.py 
b/tests/storage/test_model_storage.py
index a06cd5f..17e11ae 100644
--- a/tests/storage/test_model_storage.py
+++ b/tests/storage/test_model_storage.py
@@ -20,8 +20,8 @@ from aria.storage import (
     ModelStorage,
     models,
 )
-from aria.exceptions import StorageError
 from aria.storage import structures
+from aria.storage.exceptions import StorageError
 from aria.storage.structures import Model, Field, PointerField
 from aria import application_model_storage
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_models.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py
index f2fce90..7e289e6 100644
--- a/tests/storage/test_models.py
+++ b/tests/storage/test_models.py
@@ -19,7 +19,7 @@ from datetime import datetime
 import pytest
 
 from aria.storage import Model, Field
-from aria.exceptions import StorageError
+from aria.storage.exceptions import StorageError
 from aria.storage.models import (
     DeploymentUpdateStep,
     Relationship,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_models_api.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_models_api.py b/tests/storage/test_models_api.py
index 80826c3..2b92820 100644
--- a/tests/storage/test_models_api.py
+++ b/tests/storage/test_models_api.py
@@ -16,7 +16,7 @@
 import pytest
 
 from aria.storage import _ModelApi, models
-from aria.exceptions import StorageError
+from aria.storage.exceptions import StorageError
 
 from . import InMemoryModelDriver
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/storage/test_resource_storage.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_resource_storage.py 
b/tests/storage/test_resource_storage.py
index 9673a26..918b270 100644
--- a/tests/storage/test_resource_storage.py
+++ b/tests/storage/test_resource_storage.py
@@ -18,7 +18,7 @@ import tempfile
 
 import pytest
 
-from aria.exceptions import StorageError
+from aria.storage.exceptions import StorageError
 from aria.storage import ResourceStorage, FileSystemResourceDriver
 from . import TestFileSystem
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b1de02ca/tests/test_logger.py
----------------------------------------------------------------------
diff --git a/tests/test_logger.py b/tests/test_logger.py
index 37731bb..8c7a9af 100644
--- a/tests/test_logger.py
+++ b/tests/test_logger.py
@@ -23,7 +23,6 @@ from aria.logger import (create_logger,
                          LoggerMixin,
                          _DefaultConsoleFormat)
 
-
 def test_create_logger():
 
     logger = create_logger()


Reply via email to