Lee-W commented on code in PR #57474:
URL: https://github.com/apache/airflow/pull/57474#discussion_r2485422979
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -285,30 +289,55 @@ def ti_run(
def _get_upstream_map_indexes(
- task: MappedOperator | SerializedBaseOperator, ti_map_index: int, run_id:
str, session: SessionDep
+ *,
+ serialized_dag: SerializedDAG,
+ ti: TI,
+ session: SessionDep,
) -> Iterator[tuple[str, int | list[int] | None]]:
- task_mapped_group = task.get_closest_mapped_task_group()
+ task = serialized_dag.get_task(ti.task_id)
for upstream_task in task.upstream_list:
- upstream_mapped_group = upstream_task.get_closest_mapped_task_group()
map_indexes: int | list[int] | None
- if upstream_mapped_group is None:
+ if (upstream_mapped_group :=
upstream_task.get_closest_mapped_task_group()) is None:
# regular tasks or non-mapped task groups
map_indexes = None
- elif task_mapped_group == upstream_mapped_group:
+ elif task.get_closest_mapped_task_group() == upstream_mapped_group:
# tasks in the same mapped task group hierarchy
- map_indexes = ti_map_index
+ map_indexes = ti.map_index
else:
# tasks not in the same mapped task group
# the upstream mapped task group should combine the return xcom as
a list and return it
- mapped_ti_count: int
+ mapped_ti_count: int | None = None
+
try:
- # for cases that does not need to resolve xcom
+ # First try: without resolving XCom
mapped_ti_count =
upstream_mapped_group.get_parse_time_mapped_ti_count()
except NotFullyPopulated:
- # for cases that needs to resolve xcom to get the correct count
- mapped_ti_count = cast(
- "SchedulerExpandInput", upstream_mapped_group._expand_input
- ).get_total_map_length(run_id, session=session)
+ # Second try: resolve XCom for correct count
+ try:
+ expand_input = cast("SchedulerExpandInput",
upstream_mapped_group._expand_input)
+ mapped_ti_count =
expand_input.get_total_map_length(ti.run_id, session=session)
+ except NotFullyPopulated:
+ # For these trigger rules, unresolved map indexes are
acceptable.
+ # The success of the upstream task is not the main reason
for triggering the current task.
+ # Therefore, whether the upstream task is fully populated
can be ignored.
+ allowed_rules = {
+ TriggerRule.ALL_FAILED,
+ TriggerRule.ALL_DONE,
+ TriggerRule.ALL_DONE_MIN_ONE_SUCCESS,
+ TriggerRule.ALL_DONE_SETUP_SUCCESS,
+ TriggerRule.ONE_SUCCESS,
+ TriggerRule.ONE_FAILED,
+ TriggerRule.ONE_DONE,
+ TriggerRule.NONE_FAILED,
+ TriggerRule.NONE_SKIPPED,
+ TriggerRule.ALWAYS,
+ TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+ TriggerRule.ALL_SKIPPED,
+ }
+ if task.trigger_rule in allowed_rules:
+ mapped_ti_count = None
Review Comment:
but if we still want to change it to not success, I can do that as well :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]