uranusjr commented on code in PR #27491:
URL: https://github.com/apache/airflow/pull/27491#discussion_r1024806944
##########
airflow/models/abstractoperator.py:
##########
@@ -422,6 +424,121 @@ def get_mapped_ti_count(self, run_id: str, *, session:
Session) -> int:
counts = (g.get_mapped_ti_count(run_id, session=session) for g in
mapped_task_groups)
return functools.reduce(operator.mul, counts)
+ def expand_mapped_task(self, run_id: str, *, session: Session) ->
tuple[Sequence[TaskInstance], int]:
+ """Create the mapped task instances for mapped task.
+
+ :raise NotMapped: If this task does not need expansion.
+ :return: The newly created mapped task instances (if any) in ascending
+ order by map index, and the maximum map index value.
+ """
+ from sqlalchemy import func, or_
+
+ from airflow.models.baseoperator import BaseOperator
+ from airflow.models.mappedoperator import MappedOperator
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.settings import task_instance_mutation_hook
+
+ if not isinstance(self, (BaseOperator, MappedOperator)):
+ raise RuntimeError(f"cannot expand unrecognized operator type
{type(self).__name__}")
+
+ try:
+ total_length: int | None = self.get_mapped_ti_count(run_id,
session=session)
+ except NotFullyPopulated as e:
+ # It's possible that the upstream tasks are not yet done, but we
+ # don't have upstream of upstreams in partial DAGs (possible in the
+ # mini-scheduler), so we ignore this exception.
+ if not self.dag or not self.dag.partial:
+ self.log.error(
+ "Cannot expand %r for run %s; missing upstream values: %s",
+ self,
+ run_id,
+ sorted(e.missing),
+ )
+ total_length = None
+
+ state: TaskInstanceState | None = None
+ unmapped_ti: TaskInstance | None = (
+ session.query(TaskInstance)
+ .filter(
+ TaskInstance.dag_id == self.dag_id,
+ TaskInstance.task_id == self.task_id,
+ TaskInstance.run_id == run_id,
+ TaskInstance.map_index == -1,
+ or_(TaskInstance.state.in_(State.unfinished),
TaskInstance.state.is_(None)),
+ )
+ .one_or_none()
+ )
+
+ all_expanded_tis: list[TaskInstance] = []
+
+ if unmapped_ti:
+ # The unmapped task instance still exists and is unfinished, i.e.
we
+ # haven't tried to run it before.
Review Comment:
The ti would have been expanded before failing. The only scenario this
comment is _technically_ not true is if the scheduler crashes when trying to
expand a ti, which kind of qualifies as “not run” if you squint, and should be
treated as a bug anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]