kevinhongzl opened a new issue, #56054:
URL: https://github.com/apache/airflow/issues/56054
### Apache Airflow version
main (development)
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
The unit test
`test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task` is
passing incorrectly.
### What you think should happen instead?
We should be getting three downstream dynamic mapped tasks
(`do_something_else`) since two of the five upstream mapped tasks
(`do_something`) were removed. However, the test passed when only a single
schedulable task was created.
### How to reproduce
1. Modify
`test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task` in
`tests/unit/models/test_dagrun.py` by adding `print(tis)`
```Python
def
test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task(session,
dag_maker):
from airflow.sdk import task
@task
def do_something(i):
return 1
@task
def do_something_else(i):
return 1
with dag_maker():
nums = do_something.expand(i=[i + 1 for i in range(5)])
do_something_else.expand(i=nums)
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance("do_something_else", session=session)
ti.map_index = 0
task = ti.task
for map_index in range(1, 5):
ti = TI(task, run_id=dr.run_id, map_index=map_index,
dag_version_id=ti.dag_version_id)
session.add(ti)
ti.dag_run = dr
session.flush()
tis = dr.get_task_instances()
for ti in tis:
if ti.task_id == "do_something":
if ti.map_index > 2:
ti.state = TaskInstanceState.REMOVED
else:
ti.state = TaskInstanceState.SUCCESS
session.merge(ti)
session.commit()
# The Upstream is done with 2 removed tis and 3 success tis
(tis, _) = dr.update_state()
print(tis) # <------- add this line
assert len(tis)
assert dr.state != DagRunState.FAILED
```
2. Run the unit test: `pytest
test_dagrun.py::test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task
-s`
3. You can see something like
```
test_dagrun.py::test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task
========================= AIRFLOW ==========================
Home of the user: /root
Airflow home /root/airflow
Skipping initializing of the DB as it was initialized already.
You can re-initialize the database by adding --with-db-init flag when
running tests.
2025-09-24T13:25:13.991194Z [warning ] Skipping masking for a secret as
it's too short (<5 chars) [airflow._shared.secrets_masker.secrets_masker]
2025-09-24T13:25:14.068837Z [info ] Filling up the DagBag from /dev/null
[airflow.models.dagbag.DagBag]
2025-09-24T13:25:14.121993Z [info ] Sync 1 DAGs
[airflow.serialization.serialized_objects]
2025-09-24T13:25:14.136601Z [info ] Creating ORM DAG for test_dag
[airflow.dag_processing.collection]
[<TaskInstance: test_dag.do_something_else test map_index=0 [None]>]
PASSED
```
### Operating System
Ubuntu 22.04.5 LTS
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]