This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7a28ee3 Fix occasional external task sensor tests (#18853)
7a28ee3 is described below
commit 7a28ee370945de81fe8a16eac63197cbe93b3c3a
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Oct 12 20:27:25 2021 +0200
Fix occasional external task sensor tests (#18853)
Occassionally the sensor tests fail with assertion where
state seems to be None. This might be caused by
```
def assert_ti_state_equal(task_instance, state):
"""
Assert state of task_instances equals the given state.
"""
task_instance.refresh_from_db()
> assert task_instance.state == state
E AssertionError: assert None == <TaskInstanceState.SUCCESS:
'success'>
E + where None = <TaskI$anstance: dag_1.task_b_1
manual__2015-01-01T00:00:00+00:00 [None]>.state
```
Turned out it was because the task instance fields from
dagrun.taskinstance relationship could be returned in different
order so some of the dependencies were not met for some of the
tasks when later task was returned before earlier one.
Deterministic sorting according to task_id solved the problem.
---
tests/sensors/test_external_task_sensor.py | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index d1e150b..e507c7c 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -569,7 +569,12 @@ def run_tasks(dag_bag, execution_date=DEFAULT_DATE,
session=None):
run_type=DagRunType.MANUAL,
session=session,
)
- for ti in dagrun.task_instances:
+ # we use sorting by task_id here because for the test DAG structure of
ours
+ # this is equivalent to topological sort. It would not work in general
case
+ # but it works for our case because we specifically constructed test
DAGS
+ # in the way that those two sort methods are equivalent
+ tasks = sorted((ti for ti in dagrun.task_instances), key=lambda ti:
ti.task_id)
+ for ti in tasks:
ti.refresh_from_task(dag.get_task(ti.task_id))
tis[ti.task_id] = ti
ti.run(session=session)