This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3dd0c999f1 Attempt to stabilise tests for xcom_arg_map (#33150)
3dd0c999f1 is described below
commit 3dd0c999f1159a2fefbf32d9f10208a274a79a62
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Aug 6 13:24:37 2023 +0200
Attempt to stabilise tests for xcom_arg_map (#33150)
Similarly to #33145 - this is an attempt to stabilise flaky tests
for the test_xcom_arg_map.
Even if the mechanism is not entirely clear (provide_session should
also close the connection) seems like using pytest-fixture provided
session works better than relying on a new session created in run()
methods.
---
tests/models/test_xcom_arg_map.py | 26 +++++++++++++-------------
1 file changed, 13 insertions(+), 13 deletions(-)
diff --git a/tests/models/test_xcom_arg_map.py
b/tests/models/test_xcom_arg_map.py
index 293de37f50..541ab6fcfa 100644
--- a/tests/models/test_xcom_arg_map.py
+++ b/tests/models/test_xcom_arg_map.py
@@ -84,12 +84,12 @@ def test_xcom_map_transform_to_none(dag_maker, session):
# Run "push".
decision = dr.task_instance_scheduling_decisions(session=session)
for ti in decision.schedulable_tis:
- ti.run()
+ ti.run(session=session)
# Run "pull". This should automatically convert "c" to None.
decision = dr.task_instance_scheduling_decisions(session=session)
for ti in decision.schedulable_tis:
- ti.run()
+ ti.run(session=session)
assert results == {"a", "b", None}
@@ -118,19 +118,19 @@ def test_xcom_convert_to_kwargs_fails_task(dag_maker,
session):
# Run "push".
decision = dr.task_instance_scheduling_decisions(session=session)
for ti in decision.schedulable_tis:
- ti.run()
+ ti.run(session=session)
# Prepare to run "pull"...
decision = dr.task_instance_scheduling_decisions(session=session)
tis = {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis}
# The first two "pull" tis should also succeed.
- tis[("pull", 0)].run()
- tis[("pull", 1)].run()
+ tis[("pull", 0)].run(session=session)
+ tis[("pull", 1)].run(session=session)
# But the third one fails because the map() result cannot be used as
kwargs.
with pytest.raises(ValueError) as ctx:
- tis[("pull", 2)].run()
+ tis[("pull", 2)].run(session=session)
assert str(ctx.value) == "expand_kwargs() expects a list[dict], not
list[None]"
assert [tis[("pull", i)].state for i in range(3)] == [
@@ -163,7 +163,7 @@ def test_xcom_map_error_fails_task(dag_maker, session):
# The "push" task should not fail.
decision = dr.task_instance_scheduling_decisions(session=session)
for ti in decision.schedulable_tis:
- ti.run()
+ ti.run(session=session)
assert [ti.state for ti in decision.schedulable_tis] ==
[TaskInstanceState.SUCCESS]
# Prepare to run "pull"...
@@ -171,12 +171,12 @@ def test_xcom_map_error_fails_task(dag_maker, session):
tis = {(ti.task_id, ti.map_index): ti for ti in decision.schedulable_tis}
# The first two "pull" tis should also succeed.
- tis[("pull", 0)].run()
- tis[("pull", 1)].run()
+ tis[("pull", 0)].run(session=session)
+ tis[("pull", 1)].run(session=session)
# But the third one (for "c") will fail.
with pytest.raises(ValueError) as ctx:
- tis[("pull", 2)].run()
+ tis[("pull", 2)].run(session=session)
assert str(ctx.value) == "nope"
assert [tis[("pull", i)].state for i in range(3)] == [
@@ -216,17 +216,17 @@ def test_xcom_map_raise_to_skip(dag_maker, session):
# Run "push".
decision = dr.task_instance_scheduling_decisions(session=session)
for ti in decision.schedulable_tis:
- ti.run()
+ ti.run(session=session)
# Run "forward". This should automatically skip "c".
decision = dr.task_instance_scheduling_decisions(session=session)
for ti in decision.schedulable_tis:
- ti.run()
+ ti.run(session=session)
# Now "collect" should only get "a" and "b".
decision = dr.task_instance_scheduling_decisions(session=session)
for ti in decision.schedulable_tis:
- ti.run()
+ ti.run(session=session)
assert result == ["a", "b"]