Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-294-Workflow-tasks-execution-is-not-in-order 9525df17a -> 62e67c1ca (forced update)
ARIA-294 Workflow tasks execution is not in order Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/62e67c1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/62e67c1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/62e67c1c Branch: refs/heads/ARIA-294-Workflow-tasks-execution-is-not-in-order Commit: 62e67c1ca39cfc658719230a49856ac01f0005a4 Parents: 7bba3ab Author: max-orlov <[email protected]> Authored: Tue Jun 27 20:32:08 2017 +0300 Committer: max-orlov <[email protected]> Committed: Wed Jun 28 11:03:55 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 7 +- aria/modeling/relationship.py | 106 ++++++++----------- aria/modeling/service_common.py | 2 +- aria/modeling/service_instance.py | 4 +- aria/modeling/service_template.py | 2 +- aria/orchestrator/workflow_runner.py | 4 +- .../workflows/core/graph_compiler.py | 4 +- tests/orchestrator/test_workflow_runner.py | 10 +- .../test_task_graph_into_execution_graph.py | 89 +++++++++++++--- .../workflows/executor/test_process_executor.py | 2 +- 10 files changed, 130 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 5b02d1b..ab389d3 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -397,13 +397,8 @@ class TaskBase(mixins.ModelMixin): raise TaskRetryException(message, retry_interval=retry_interval) @declared_attr - def dependency_fk(self): - return relationship.foreign_key('task', nullable=True) - - @declared_attr def dependencies(cls): - # symmetric relationship causes funky graphs - return relationship.one_to_many_self(cls, 'dependency_fk') + return relationship.many_to_many(cls, 'task', self=True) def has_ended(self): return self.status in (self.SUCCESS, self.FAILED) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/relationship.py ---------------------------------------------------------------------- diff --git a/aria/modeling/relationship.py b/aria/modeling/relationship.py index 40be5b2..30d174f 100644 --- a/aria/modeling/relationship.py +++ b/aria/modeling/relationship.py @@ -90,35 +90,6 @@ def one_to_one_self(model_class, fk): ) -def one_to_many_self(model_class, fk, dict_key=None): - """ - Declare a one-to-many relationship property. The property value would be a list or dict of - instances of the same model. - - You will need an associated foreign key to our own table. - - *This utility method should only be used during class creation.* - - :param model_class: The class in which this relationship will be declared - :type model_class: type - :param fk: Foreign key name - :type fk: basestring - :param dict_key: If set the value will be a dict with this key as the dict key; otherwise will - be a list - :type dict_key: basestring - """ - return _relationship( - model_class, - model_class.__tablename__, - relationship_kwargs={ - 'remote_side': '{model_class}.{remote_column}'.format( - model_class=model_class.__name__, remote_column=fk) - }, - back_populates=False, - dict_key=dict_key - ) - - def one_to_one(model_class, other_table, fk=None, @@ -162,11 +133,12 @@ def one_to_one(model_class, def one_to_many(model_class, - child_table, - child_fk=None, + other_table=None, + other_fk=None, dict_key=None, back_populates=None, - rel_kwargs=None): + rel_kwargs=None, + self=False): """ Declare a one-to-many relationship property. The property value would be a list or dict of instances of the child table's model. @@ -181,9 +153,9 @@ def one_to_many(model_class, :param model_class: The class in which this relationship will be declared :type model_class: type :param child_table: Child table name - :type child_table: basestring - :param child_fk: Foreign key name at the child table (no need specify if there's no ambiguity) - :type child_fk: basestring + :type other_table: basestring + :param other_fk: Foreign key name at the child table (no need specify if there's no ambiguity) + :type other_fk: basestring :param dict_key: If set the value will be a dict with this key as the dict key; otherwise will be a list :type dict_key: basestring @@ -191,18 +163,28 @@ def one_to_many(model_class, false to disable :type back_populates: basestring|bool """ - rel_kwargs = rel_kwargs or {} - rel_kwargs.setdefault('cascade', 'all') - if back_populates is None: - back_populates = model_class.__tablename__ + relationship_kwargs = rel_kwargs or {} + if self: + assert other_fk + other_table_name = model_class.__tablename__ + back_populates = False + relationship_kwargs['remote_side'] = '{model}.{column}'.format(model=model_class.__name__, + column=other_fk) + + else: + assert other_table + other_table_name = other_table + if back_populates is None: + back_populates = model_class.__tablename__ + relationship_kwargs.setdefault('cascade', 'all') return _relationship( model_class, - child_table, + other_table_name, back_populates=back_populates, - other_fk=child_fk, + other_fk=other_fk, dict_key=dict_key, - relationship_kwargs=rel_kwargs) + relationship_kwargs=relationship_kwargs) def many_to_one(model_class, @@ -247,7 +229,8 @@ def many_to_many(model_class, other_table, prefix=None, dict_key=None, - other_property=None): + other_property=None, + self=False): """ Declare a many-to-many relationship property. The property value would be a list or dict of instances of the other table's model. @@ -280,8 +263,8 @@ def many_to_many(model_class, this_column_name = '{0}_id'.format(this_table) this_foreign_key = '{0}.id'.format(this_table) - other_column_name = '{0}_id'.format(other_table) - other_foreign_key = '{0}.id'.format(other_table) + other_column_name = '{0}_{1}'.format(other_table, 'self_ref_id' if self else 'id') + other_foreign_key = '{0}.{1}'.format(other_table, 'id') secondary_table_name = '{0}_{1}'.format(this_table, other_table) @@ -299,13 +282,20 @@ def many_to_many(model_class, other_foreign_key ) - return _relationship( - model_class, - other_table, - relationship_kwargs={'secondary': secondary_table}, - backref_kwargs={'name': other_property, 'uselist': True} if other_property else None, - dict_key=dict_key - ) + kwargs = {'relationship_kwargs': {'secondary': secondary_table}} + + if self: + kwargs['back_populates'] = NO_BACK_POP + kwargs['relationship_kwargs']['primaryjoin'] = \ + getattr(model_class, 'id') == getattr(secondary_table.c, this_column_name) + kwargs['relationship_kwargs']['secondaryjoin'] = \ + getattr(model_class, 'id') == getattr(secondary_table.c, other_column_name) + else: + kwargs['backref_kwargs'] = \ + {'name': other_property, 'uselist': True} if other_property else None + kwargs['dict_key'] = dict_key + + return _relationship(model_class, other_table, **kwargs) def _relationship(model_class, @@ -368,14 +358,6 @@ def _get_secondary_table(metadata, return Table( name, metadata, - Column( - first_column, - Integer, - ForeignKey(first_foreign_key) - ), - Column( - second_column, - Integer, - ForeignKey(second_foreign_key) - ) + Column(first_column, Integer, ForeignKey(first_foreign_key)), + Column(second_column, Integer, ForeignKey(second_foreign_key)) ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/service_common.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_common.py b/aria/modeling/service_common.py index 272dfd7..0bb861f 100644 --- a/aria/modeling/service_common.py +++ b/aria/modeling/service_common.py @@ -320,7 +320,7 @@ class TypeBase(InstanceModelMixin): @declared_attr def children(cls): - return relationship.one_to_many_self(cls, 'parent_type_fk') + return relationship.one_to_many(cls, other_fk='parent_type_fk', self=True) # region foreign keys http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/service_instance.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_instance.py b/aria/modeling/service_instance.py index 2bf9872..b451889 100644 --- a/aria/modeling/service_instance.py +++ b/aria/modeling/service_instance.py @@ -483,7 +483,7 @@ class NodeBase(InstanceModelMixin): @declared_attr def outbound_relationships(cls): return relationship.one_to_many( - cls, 'relationship', child_fk='source_node_fk', back_populates='source_node', + cls, 'relationship', other_fk='source_node_fk', back_populates='source_node', rel_kwargs=dict( order_by='Relationship.source_position', collection_class=ordering_list('source_position', count_from=0) @@ -493,7 +493,7 @@ class NodeBase(InstanceModelMixin): @declared_attr def inbound_relationships(cls): return relationship.one_to_many( - cls, 'relationship', child_fk='target_node_fk', back_populates='target_node', + cls, 'relationship', other_fk='target_node_fk', back_populates='target_node', rel_kwargs=dict( order_by='Relationship.target_position', collection_class=ordering_list('target_position', count_from=0) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/service_template.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py index 4d1e837..344da6d 100644 --- a/aria/modeling/service_template.py +++ b/aria/modeling/service_template.py @@ -493,7 +493,7 @@ class NodeTemplateBase(TemplateModelMixin): @declared_attr def requirement_templates(cls): - return relationship.one_to_many(cls, 'requirement_template', child_fk='node_template_fk') + return relationship.one_to_many(cls, 'requirement_template', other_fk='node_template_fk') @declared_attr def properties(cls): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 4a50fb2..3d58386 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -97,8 +97,8 @@ class WorkflowRunner(object): if not self._is_resume: workflow_fn = self._get_workflow_fn() self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - graph_compiler.GraphCompiler(self._workflow_context, executor.__class__).compile( - self._tasks_graph) + compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__) + compiler.compile(self._tasks_graph) self._engine = engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/orchestrator/workflows/core/graph_compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/graph_compiler.py b/aria/orchestrator/workflows/core/graph_compiler.py index f339038..81543d5 100644 --- a/aria/orchestrator/workflows/core/graph_compiler.py +++ b/aria/orchestrator/workflows/core/graph_compiler.py @@ -37,7 +37,6 @@ class GraphCompiler(object): :param end_stub_type: internal use :param depends_on: internal use """ - task_graph = task_graph or self._task_graph depends_on = list(depends_on) # Insert start marker @@ -110,8 +109,7 @@ class GraphCompiler(object): """ tasks = [] for dependency in dependencies: - if getattr(dependency, 'actor', False): - # This is + if isinstance(dependency, (api.task.StubTask, api.task.OperationTask)): dependency_name = dependency.id else: dependency_name = self._end_graph_suffix(dependency.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index 103596b..e640c7d 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -350,11 +350,11 @@ class TestResumableWorkflows(object): if events['execution_ended'].wait(60) is False: raise TimeoutError("Execution did not end") - first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None}) - assert first_task.status == first_task.SUCCESS - assert second_task.status in (second_task.FAILED, second_task.RETRYING) + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + assert any(task.status == task.SUCCESS for task in tasks) + assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks) events['is_resumed'].set() - assert second_task.status in (second_task.FAILED, second_task.RETRYING) + assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks) # Create a new workflow runner, with an existing execution id. This would cause # the old execution to restart. @@ -370,7 +370,7 @@ class TestResumableWorkflows(object): new_wf_runner.execute() # Wait for it to finish and assert changes. - assert second_task.status == second_task.SUCCESS + assert all(task.status == task.SUCCESS for task in tasks) assert node.attributes['invocations'].value == 3 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index f0d2b26..e24c901 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -26,15 +26,17 @@ from tests import storage def test_task_graph_into_execution_graph(tmpdir): interface_name = 'Standard' - operation_name = 'create' + op1_name, op2_name, op3_name = 'create', 'configure', 'start' workflow_context = mock.context.simple(str(tmpdir)) node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface = mock.models.create_interface( node.service, interface_name, - operation_name, + op1_name, operation_kwargs=dict(function='test') ) + interface.operations[op2_name] = mock.models.create_operation(op2_name) # pylint: disable=unsubscriptable-object + interface.operations[op3_name] = mock.models.create_operation(op3_name) # pylint: disable=unsubscriptable-object node.interfaces[interface.name] = interface workflow_context.model.node.update(node) @@ -46,18 +48,31 @@ def test_task_graph_into_execution_graph(tmpdir): simple_before_task = api.task.OperationTask( node, interface_name=interface_name, - operation_name=operation_name) + operation_name=op1_name) simple_after_task = api.task.OperationTask( node, interface_name=interface_name, - operation_name=operation_name) + operation_name=op1_name) inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') - inner_task = api.task.OperationTask( + inner_task_1 = api.task.OperationTask( node, interface_name=interface_name, - operation_name=operation_name) - inner_task_graph.add_tasks(inner_task) + operation_name=op1_name) + inner_task_2 = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op2_name) + inner_task_3 = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=op3_name) + inner_task_graph.add_tasks(inner_task_1) + inner_task_graph.add_tasks(inner_task_2) + inner_task_graph.add_tasks(inner_task_3) + inner_task_graph.add_dependency(inner_task_2, inner_task_1) + inner_task_graph.add_dependency(inner_task_3, inner_task_1) + inner_task_graph.add_dependency(inner_task_3, inner_task_2) test_task_graph.add_tasks(simple_before_task) test_task_graph.add_tasks(simple_after_task) @@ -70,13 +85,15 @@ def test_task_graph_into_execution_graph(tmpdir): execution_tasks = topological_sort(_graph(workflow_context.execution.tasks)) - assert len(execution_tasks) == 7 + assert len(execution_tasks) == 9 expected_tasks_names = [ '{0}-Start'.format(test_task_graph.id), simple_before_task.id, '{0}-Start'.format(inner_task_graph.id), - inner_task.id, + inner_task_1.id, + inner_task_2.id, + inner_task_3.id, '{0}-End'.format(inner_task_graph.id), simple_after_task.id, '{0}-End'.format(test_task_graph.id) @@ -86,17 +103,55 @@ def test_task_graph_into_execution_graph(tmpdir): assert all(isinstance(task, models.Task) for task in execution_tasks) execution_tasks = iter(execution_tasks) - assert next(execution_tasks)._stub_type == models.Task.START_WORKFLOW - _assert_execution_is_api_task(next(execution_tasks), simple_before_task) - assert next(execution_tasks)._stub_type == models.Task.START_SUBWROFKLOW - _assert_execution_is_api_task(next(execution_tasks), inner_task) - assert next(execution_tasks)._stub_type == models.Task.END_SUBWORKFLOW - _assert_execution_is_api_task(next(execution_tasks), simple_after_task) - assert next(execution_tasks)._stub_type == models.Task.END_WORKFLOW - + _assert_tasks( + iter(execution_tasks), + iter([simple_after_task, inner_task_1, inner_task_2, inner_task_3, simple_after_task]) + ) storage.release_sqlite_storage(workflow_context.model) +def _assert_tasks(execution_tasks, api_tasks): + start_workflow_exec_task = next(execution_tasks) + assert start_workflow_exec_task._stub_type == models.Task.START_WORKFLOW + + before_exec_task = next(execution_tasks) + simple_before_task = next(api_tasks) + _assert_execution_is_api_task(before_exec_task, simple_before_task) + assert before_exec_task.dependencies == [start_workflow_exec_task] + + start_subworkflow_exec_task = next(execution_tasks) + assert start_subworkflow_exec_task._stub_type == models.Task.START_SUBWROFKLOW + assert start_subworkflow_exec_task.dependencies == [before_exec_task] + + inner_exec_task_1 = next(execution_tasks) + inner_task_1 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_1, inner_task_1) + assert inner_exec_task_1.dependencies == [start_subworkflow_exec_task] + + inner_exec_task_2 = next(execution_tasks) + inner_task_2 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_2, inner_task_2) + assert inner_exec_task_2.dependencies == [inner_exec_task_1] + + inner_exec_task_3 = next(execution_tasks) + inner_task_3 = next(api_tasks) + _assert_execution_is_api_task(inner_exec_task_3, inner_task_3) + assert sorted(inner_exec_task_3.dependencies) == sorted([inner_exec_task_1, inner_exec_task_2]) + + end_subworkflow_exec_task = next(execution_tasks) + assert end_subworkflow_exec_task._stub_type == models.Task.END_SUBWORKFLOW + assert end_subworkflow_exec_task.dependencies == [inner_exec_task_3] + + after_exec_task = next(execution_tasks) + simple_after_task = next(api_tasks) + _assert_execution_is_api_task(after_exec_task, simple_after_task) + assert after_exec_task.dependencies == [end_subworkflow_exec_task] + + end_workflow_exec_task = next(execution_tasks) + assert end_workflow_exec_task._stub_type == models.Task.END_WORKFLOW + assert end_workflow_exec_task.dependencies == [after_exec_task] + + def _assert_execution_is_api_task(execution_task, api_task): assert execution_task.name == api_task.name assert execution_task.function == api_task.function http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 6f5c827..6cac288 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -99,7 +99,7 @@ class TestProcessExecutor(object): executor.terminate(ctx.task.id) # Give a chance to the processes to terminate - time.sleep(2) + time.sleep(10) # windows might require more time assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE for p in psutil.process_iter() for pid in pids)
