ephraimbuddy commented on code in PR #27491:
URL: https://github.com/apache/airflow/pull/27491#discussion_r1024864920
##########
airflow/models/abstractoperator.py:
##########
@@ -422,6 +424,121 @@ def get_mapped_ti_count(self, run_id: str, *, session:
Session) -> int:
counts = (g.get_mapped_ti_count(run_id, session=session) for g in
mapped_task_groups)
return functools.reduce(operator.mul, counts)
+ def expand_mapped_task(self, run_id: str, *, session: Session) ->
tuple[Sequence[TaskInstance], int]:
+ """Create the mapped task instances for mapped task.
+
+ :raise NotMapped: If this task does not need expansion.
+ :return: The newly created mapped task instances (if any) in ascending
+ order by map index, and the maximum map index value.
+ """
+ from sqlalchemy import func, or_
+
+ from airflow.models.baseoperator import BaseOperator
+ from airflow.models.mappedoperator import MappedOperator
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.settings import task_instance_mutation_hook
+
+ if not isinstance(self, (BaseOperator, MappedOperator)):
+ raise RuntimeError(f"cannot expand unrecognized operator type
{type(self).__name__}")
+
+ try:
+ total_length: int | None = self.get_mapped_ti_count(run_id,
session=session)
+ except NotFullyPopulated as e:
+ # It's possible that the upstream tasks are not yet done, but we
+ # don't have upstream of upstreams in partial DAGs (possible in the
Review Comment:
In partial dags, upstreams of the needed tasks(which are upstream to the
task being run) are empty, possibly removed or doesn't really have upstreams
##########
airflow/models/abstractoperator.py:
##########
@@ -422,6 +424,121 @@ def get_mapped_ti_count(self, run_id: str, *, session:
Session) -> int:
counts = (g.get_mapped_ti_count(run_id, session=session) for g in
mapped_task_groups)
return functools.reduce(operator.mul, counts)
+ def expand_mapped_task(self, run_id: str, *, session: Session) ->
tuple[Sequence[TaskInstance], int]:
+ """Create the mapped task instances for mapped task.
+
+ :raise NotMapped: If this task does not need expansion.
+ :return: The newly created mapped task instances (if any) in ascending
+ order by map index, and the maximum map index value.
+ """
+ from sqlalchemy import func, or_
+
+ from airflow.models.baseoperator import BaseOperator
+ from airflow.models.mappedoperator import MappedOperator
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.settings import task_instance_mutation_hook
+
+ if not isinstance(self, (BaseOperator, MappedOperator)):
+ raise RuntimeError(f"cannot expand unrecognized operator type
{type(self).__name__}")
+
+ try:
+ total_length: int | None = self.get_mapped_ti_count(run_id,
session=session)
+ except NotFullyPopulated as e:
+ # It's possible that the upstream tasks are not yet done, but we
+ # don't have upstream of upstreams in partial DAGs (possible in the
Review Comment:
In partial dags, upstreams of the needed tasks(which are upstream to the
task being run) are empty, possibly removed or don't really have upstreams
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]