benbuckman commented on issue #34023:
URL: https://github.com/apache/airflow/issues/34023#issuecomment-1705862117

   Thanks @hussein-awala for digging into the fix so quickly.
   
   Something else that's worth looking into and fixing here, is why unit tests 
with `DebugExecutor` behave differently. Take this unit test for example – 
simplified again for demonstration:
   
   ```python
   import unittest
   from datetime import datetime, timezone
   
   from airflow.exceptions import BackfillUnfinished
   from airflow.executors.debug_executor import DebugExecutor
   from airflow.models import DagBag
   from airflow.models.taskinstance import TaskInstance
   
   from .demo_trigger_one_failed import demo_trigger_one_failed
   
   class TestDemoDag(unittest.TestCase):
       def test_handle_failed_delivery(self):
           dagbag = DagBag(include_examples=False, safe_mode=False)
           demo_dag = dagbag.get_dag("demo_trigger_one_failed")
           now = datetime.now(timezone.utc)
   
           # We need to use the slow DebugExecutor (not `dag.test()`) to run 
this
           # b/c of https://github.com/apache/airflow/discussions/32831
           demo_dag.clear()
           with self.assertRaises(BackfillUnfinished):
               demo_dag.run(
                   start_date=now,
                   end_date=now,
                   executor=DebugExecutor(),
                   run_at_least_once=True,
                   verbose=True,
                   disable_retry=True,
               )
   
           downstream_task_ids = 
list(demo_dag.task_group_dict["deliver_records"].children.keys())
           print(f"downstream_task_ids: {downstream_task_ids}")
   
           task_instance_states: dict[str, str | None] = {}  # task_id => state
   
           for task_id in downstream_task_ids:
               # (demo simplified w/ hard-coded 0 for single mapped task)
               ti = TaskInstance(demo_dag.task_dict[task_id], 
execution_date=now, map_index=0)
               task_instance_states[task_id] = ti.current_state()
   
           print(f"task_instance_states: {task_instance_states}")
   
           self.assertEqual("success", 
task_instance_states["deliver_records.submit_job"])
           self.assertEqual("failed", 
task_instance_states["deliver_records.fake_sensor"])
           self.assertEqual("upstream_failed", 
task_instance_states["deliver_records.deliver_record"])
   
           # Test says this ran and succeeded - but in actual scheduler/UI,
           # that's not true!
           self.assertEqual("success", 
task_instance_states["deliver_records.handle_failed_delivery"])
   ```
   
   Put that in a file `test_demo_trigger_one_failed.py` next to the file with 
the DAG above, and run it with `python -m unittest 
path/to/test_demo_trigger_one_failed.py`.
   
   Note,
   - As commented inline, this uses `DebugExecutor` not `dag.test()` because 
the latter [cannot test error 
cases](https://github.com/apache/airflow/discussions/32831).
   - The error in `fake_sensor_task` is in the DAG itself (contrived for demo); 
in a real test this would be stubbed.
   
   `task_instance_states` at the end is shown to be:
   ```
   task_instance_states: 
   {'deliver_records.submit_job': 'success',
    'deliver_records.fake_sensor': 'failed',
    'deliver_records.deliver_record': 'upstream_failed',
    'deliver_records.handle_failed_delivery': 'success'
   }
   ```
   and the test _passes_, asserting that `handle_failed_delivery` _succeeded_. 
This is a misleading test output, because as we see above, in the real 
scheduler, `handle_failed_delivery` doesn't run at all! (Its 
`ti.current_state()` should be `None` not `success`.)
   
   We have unit tests like this in our application, and were confident that our 
actual task with `trigger_rule=ONE_FAILED` would work correctly, and were very 
surprised when it broke in production.
   
   In the process of fixing this with the "real" executors 
(`SequentialExecutor` in the simple demo above; `KubernetesExecutor` et al in a 
real production application), it would be great if `DebugExecutor` behaved at 
parity with the others, so users can rely on test coverage.
   
   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to