benbuckman commented on issue #34023:
URL: https://github.com/apache/airflow/issues/34023#issuecomment-1705862117
Thanks @hussein-awala for digging into the fix so quickly.
Something else that's worth looking into and fixing here, is why unit tests
with `DebugExecutor` behave differently. Take this unit test for example –
simplified again for demonstration:
```python
import unittest
from datetime import datetime, timezone
from airflow.exceptions import BackfillUnfinished
from airflow.executors.debug_executor import DebugExecutor
from airflow.models import DagBag
from airflow.models.taskinstance import TaskInstance
from .demo_trigger_one_failed import demo_trigger_one_failed
class TestDemoDag(unittest.TestCase):
def test_handle_failed_delivery(self):
dagbag = DagBag(include_examples=False, safe_mode=False)
demo_dag = dagbag.get_dag("demo_trigger_one_failed")
now = datetime.now(timezone.utc)
# We need to use the slow DebugExecutor (not `dag.test()`) to run
this
# b/c of https://github.com/apache/airflow/discussions/32831
demo_dag.clear()
with self.assertRaises(BackfillUnfinished):
demo_dag.run(
start_date=now,
end_date=now,
executor=DebugExecutor(),
run_at_least_once=True,
verbose=True,
disable_retry=True,
)
downstream_task_ids =
list(demo_dag.task_group_dict["deliver_records"].children.keys())
print(f"downstream_task_ids: {downstream_task_ids}")
task_instance_states: dict[str, str | None] = {} # task_id => state
for task_id in downstream_task_ids:
# (demo simplified w/ hard-coded 0 for single mapped task)
ti = TaskInstance(demo_dag.task_dict[task_id],
execution_date=now, map_index=0)
task_instance_states[task_id] = ti.current_state()
print(f"task_instance_states: {task_instance_states}")
self.assertEqual("success",
task_instance_states["deliver_records.submit_job"])
self.assertEqual("failed",
task_instance_states["deliver_records.fake_sensor"])
self.assertEqual("upstream_failed",
task_instance_states["deliver_records.deliver_record"])
# Test says this ran and succeeded - but in actual scheduler/UI,
# that's not true!
self.assertEqual("success",
task_instance_states["deliver_records.handle_failed_delivery"])
```
Put that in a file `test_demo_trigger_one_failed.py` next to the file with
the DAG above, and run it with `python -m unittest
path/to/test_demo_trigger_one_failed.py`.
Note,
- As commented inline, this uses `DebugExecutor` not `dag.test()` because
the latter [cannot test error
cases](https://github.com/apache/airflow/discussions/32831).
- The error in `fake_sensor_task` is in the DAG itself (contrived for demo);
in a real test this would be stubbed.
`task_instance_states` at the end is shown to be:
```
task_instance_states:
{'deliver_records.submit_job': 'success',
'deliver_records.fake_sensor': 'failed',
'deliver_records.deliver_record': 'upstream_failed',
'deliver_records.handle_failed_delivery': 'success'
}
```
and the test _passes_, asserting that `handle_failed_delivery` _succeeded_.
This is a misleading test output, because as we see above, in the real
scheduler, `handle_failed_delivery` doesn't run at all! (Its
`ti.current_state()` should be `None` not `success`.)
We have unit tests like this in our application, and were confident that our
actual task with `trigger_rule=ONE_FAILED` would work correctly, and were very
surprised when it broke in production.
In the process of fixing this with the "real" executors
(`SequentialExecutor` in the simple demo above; `KubernetesExecutor` et al in a
real production application), it would be great if `DebugExecutor` behaved at
parity with the others, so users can rely on test coverage.
Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]