This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c80d91be62b643fe21efca8fdc5a94d724cf3f58 Author: Jarek Potiuk <[email protected]> AuthorDate: Sun Aug 6 13:24:37 2023 +0200 Attempt to stabilise tests for xcom_arg_map (#33150) Similarly to #33145 - this is an attempt to stabilise flaky tests for the test_xcom_arg_map. Even if the mechanism is not entirely clear (provide_session should also close the connection) seems like using pytest-fixture provided session works better than relying on a new session created in run() methods. (cherry picked from commit 3dd0c999f1159a2fefbf32d9f10208a274a79a62) --- tests/models/test_xcom_arg_map.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/models/test_xcom_arg_map.py b/tests/models/test_xcom_arg_map.py index d8c087b89e..05775da574 100644 --- a/tests/models/test_xcom_arg_map.py +++ b/tests/models/test_xcom_arg_map.py @@ -84,12 +84,12 @@ def test_xcom_map_transform_to_none(dag_maker, session): # Run "push". decision = dr.task_instance_scheduling_decisions(session=session) for ti in decision.schedulable_tis: - ti.run() + ti.run(session=session) # Run "pull". This should automatically convert "c" to None. decision = dr.task_instance_scheduling_decisions(session=session) for ti in decision.schedulable_tis: - ti.run() + ti.run(session=session) assert results == {"a", "b", None} @@ -118,19 +118,19 @@ def test_xcom_convert_to_kwargs_fails_task(dag_maker, session): # Run "push". decision = dr.task_instance_scheduling_decisions(session=session) for ti in decision.schedulable_tis: - ti.run() + ti.run(session=session) # Prepare to run "pull"... decision = dr.task_instance_scheduling_decisions(session=session) tis = {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis} # The first two "pull" tis should also succeed. - tis[("pull", 0)].run() - tis[("pull", 1)].run() + tis[("pull", 0)].run(session=session) + tis[("pull", 1)].run(session=session) # But the third one fails because the map() result cannot be used as kwargs. with pytest.raises(ValueError) as ctx: - tis[("pull", 2)].run() + tis[("pull", 2)].run(session=session) assert str(ctx.value) == "expand_kwargs() expects a list[dict], not list[None]" assert [tis[("pull", i)].state for i in range(3)] == [ @@ -163,7 +163,7 @@ def test_xcom_map_error_fails_task(dag_maker, session): # The "push" task should not fail. decision = dr.task_instance_scheduling_decisions(session=session) for ti in decision.schedulable_tis: - ti.run() + ti.run(session=session) assert [ti.state for ti in decision.schedulable_tis] == [TaskInstanceState.SUCCESS] # Prepare to run "pull"... @@ -171,12 +171,12 @@ def test_xcom_map_error_fails_task(dag_maker, session): tis = {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis} # The first two "pull" tis should also succeed. - tis[("pull", 0)].run() - tis[("pull", 1)].run() + tis[("pull", 0)].run(session=session) + tis[("pull", 1)].run(session=session) # But the third one (for "c") will fail. with pytest.raises(ValueError) as ctx: - tis[("pull", 2)].run() + tis[("pull", 2)].run(session=session) assert str(ctx.value) == "nope" assert [tis[("pull", i)].state for i in range(3)] == [ @@ -216,17 +216,17 @@ def test_xcom_map_raise_to_skip(dag_maker, session): # Run "push". decision = dr.task_instance_scheduling_decisions(session=session) for ti in decision.schedulable_tis: - ti.run() + ti.run(session=session) # Run "forward". This should automatically skip "c". decision = dr.task_instance_scheduling_decisions(session=session) for ti in decision.schedulable_tis: - ti.run() + ti.run(session=session) # Now "collect" should only get "a" and "b". decision = dr.task_instance_scheduling_decisions(session=session) for ti in decision.schedulable_tis: - ti.run() + ti.run(session=session) assert result == ["a", "b"]
