This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new c5964a50f51 Fix 
test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task (#56065)
c5964a50f51 is described below

commit c5964a50f51dede0f90b9f48b2dc82f0f3c631ca
Author: Zhen-Lun (Kevin) Hong <[email protected]>
AuthorDate: Thu Sep 25 01:34:22 2025 +0530

    Fix test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task 
(#56065)
    
    (cherry picked from commit 38ab153a283ac000e99f0378da7b2119ca877608)
---
 airflow-core/tests/unit/models/test_dagrun.py | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index 47bf91d4687..74cf038dcec 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -2169,14 +2169,20 @@ def 
test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task(session,
 
     dr = dag_maker.create_dagrun()
 
-    ti = dr.get_task_instance("do_something_else", session=session)
-    ti.map_index = 0
-    task = ti.task
-    for map_index in range(1, 5):
-        ti = TI(task, run_id=dr.run_id, map_index=map_index, 
dag_version_id=ti.dag_version_id)
-        session.add(ti)
-        ti.dag_run = dr
+    tis = dr.get_task_instances()
+    for ti in tis:
+        if ti.task_id == "do_something_else":
+            ti.map_index = 0
+            task = ti.task
+            for map_index in range(1, 5):
+                ti = TI(task, run_id=dr.run_id, map_index=map_index, 
dag_version_id=ti.dag_version_id)
+                session.add(ti)
+                ti.dag_run = dr
+        else:
+            # run tasks "do_something" to get XCOMs for correct downstream 
length
+            ti.run()
     session.flush()
+
     tis = dr.get_task_instances()
     for ti in tis:
         if ti.task_id == "do_something":
@@ -2188,7 +2194,7 @@ def 
test_schedulable_task_exist_when_rerun_removed_upstream_mapped_task(session,
     session.commit()
     # The Upstream is done with 2 removed tis and 3 success tis
     (tis, _) = dr.update_state()
-    assert len(tis)
+    assert len(tis) == 3
     assert dr.state != DagRunState.FAILED
 
 

Reply via email to