kevinhongzl commented on code in PR #62034:
URL: https://github.com/apache/airflow/pull/62034#discussion_r3234731428
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2855,6 +2855,127 @@ def mapped_print_value(arg):
assert len(success_tis) == rerun_length
+def test_mapped_task_length_reduction_rerun_downstream_not_deadlocked(session,
dag_maker):
+ @task
+ def producer():
+ context = get_current_context()
+ if context["ti"].try_number == 0:
+ return [i for i in range(3)]
+ return [i for i in range(2)]
+
+ @task
+ def work(arg):
+ return arg
+
+ @task
+ def finish(data):
+ return sum(data)
+
+ def _task_ids(tis):
+ return [(ti.task_id, ti.map_index) for ti in tis]
+
+ with dag_maker(session=session):
+ produced = producer()
+ mapped = work.expand(arg=produced)
+ done = finish(produced)
+ mapped >> done
+
+ dr: DagRun = dag_maker.create_dagrun()
+
+ # First run with 3 mapped task instances.
+ dag_maker.run_ti("producer", dr)
+ decision = dr.task_instance_scheduling_decisions(session=session)
+ assert _task_ids(decision.schedulable_tis) == [("work", 0), ("work", 1),
("work", 2)]
+
+ for ti in decision.schedulable_tis:
+ dag_maker.run_ti(ti.task_id, dr, map_index=ti.map_index)
+ decision = dr.task_instance_scheduling_decisions(session=session)
+ assert _task_ids(decision.schedulable_tis) == [("finish", -1)]
+ dag_maker.run_ti("finish", dr)
+
+ # Clear and rerun with one fewer mapped task instance.
+ clear_task_instances(dr.get_task_instances(session=session),
session=session)
+ ti = dr.get_task_instance(task_id="producer", session=session)
+ ti.try_number += 1
+ session.merge(ti)
+
+ dag_maker.run_ti("producer", dr)
+ decision = dr.task_instance_scheduling_decisions(session=session)
+ assert _task_ids(decision.schedulable_tis) == [("work", 0), ("work", 1)]
+
+ mapped_states = session.execute(
+ select(TI.map_index, TI.state)
+ .where(TI.task_id == "work", TI.dag_id == dr.dag_id, TI.run_id ==
dr.run_id)
+ .order_by(TI.map_index)
+ ).all()
+ assert mapped_states == [
+ (0, State.NONE),
+ (1, State.NONE),
+ (2, TaskInstanceState.REMOVED),
+ ]
+
+ for ti in decision.schedulable_tis:
+ dag_maker.run_ti(ti.task_id, dr, map_index=ti.map_index)
+ decision = dr.task_instance_scheduling_decisions(session=session)
+ assert _task_ids(decision.schedulable_tis) == [("finish", -1)]
+
+ dag_maker.run_ti("finish", dr)
+ finish_ti = dr.get_task_instance(task_id="finish", map_index=-1,
session=session)
+ assert finish_ti
+ assert finish_ti.state == TaskInstanceState.SUCCESS
+
+
+def test_rerun_with_upstream_task_removed(session, dag_maker):
Review Comment:
I guess this test would be enough to check the case where both upstream and
downstream tasks are non-mapped?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]