jason810496 commented on code in PR #56162:
URL: https://github.com/apache/airflow/pull/56162#discussion_r2387141067
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1541,7 +1541,8 @@ def _expand_mapped_task_if_needed(ti: TI) -> Iterable[TI]
| None:
)
)
revised_map_index_task_ids.add(schedulable.task.task_id)
- ready_tis.append(schedulable)
+ if schedulable.state in SCHEDULEABLE_STATES:
+ ready_tis.append(schedulable)
Review Comment:
It would be nice to add comment that `_revise_map_indexes_if_mapped` might
set TI to `REMOVED` state so that we have to check the `schedulable.state`
again before we add to `ready_tis`.
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -2253,6 +2253,56 @@ def tg(x, y):
}
[email protected]("rerun_length", [0, 1, 2, 3])
+def test_mapped_task_rerun_with_different_length_of_args(session, dag_maker,
rerun_length):
+ @task
+ def generate_mapping_args():
+ context = get_current_context()
+ if context["ti"].try_number == 0:
+ args = [i for i in range(2)]
+ else:
+ args = [i for i in range(rerun_length)]
+ return args
+
+ @task
+ def mapped_print_value(arg):
+ return arg
+
+ with dag_maker(session=session):
+ args = generate_mapping_args()
+ mapped_print_value.expand(arg=args)
+
+ # First Run
+ dr = dag_maker.create_dagrun()
+ dag_maker.run_ti("generate_mapping_args", dr)
+
+ decision = dr.task_instance_scheduling_decisions(session=session)
+ for ti in decision.schedulable_tis:
+ dag_maker.run_ti(ti.task_id, dr, map_index=ti.map_index)
+
+ clear_task_instances(dr.get_task_instances(), session=session)
+
+ # Second Run
+ ti = dr.get_task_instance(task_id="generate_mapping_args", session=session)
+ ti.try_number += 1
+ session.merge(ti)
+ dag_maker.run_ti("generate_mapping_args", dr)
+
+ # Check if the new mapped task instances are correctly scheduled
+ decision = dr.task_instance_scheduling_decisions(session=session)
+ assert len(decision.schedulable_tis) == rerun_length
+ assert all([ti.task_id == "mapped_print_value" for ti in
decision.schedulable_tis])
+
+ # Check if mapped task rerun successfully
+ for ti in decision.schedulable_tis:
+ dag_maker.run_ti(ti.task_id, dr, map_index=ti.map_index)
+ query = select(TI).filter_by(
Review Comment:
The `filter_by` is legacy SQLAlchemy 1.x syntax, it would be better to
replace with `where`.
```suggestion
query = (
select(TI)
.where(
TI.dag_id == dr.dag_id,
TI.run_id == dr.run_id,
TI.task_id == "mapped_print_value",
TI.state == TaskInstanceState.SUCCESS,
)
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]