uranusjr commented on code in PR #27491:
URL: https://github.com/apache/airflow/pull/27491#discussion_r1024809018


##########
airflow/models/abstractoperator.py:
##########
@@ -422,6 +424,121 @@ def get_mapped_ti_count(self, run_id: str, *, session: 
Session) -> int:
         counts = (g.get_mapped_ti_count(run_id, session=session) for g in 
mapped_task_groups)
         return functools.reduce(operator.mul, counts)
 
+    def expand_mapped_task(self, run_id: str, *, session: Session) -> 
tuple[Sequence[TaskInstance], int]:
+        """Create the mapped task instances for mapped task.
+
+        :raise NotMapped: If this task does not need expansion.
+        :return: The newly created mapped task instances (if any) in ascending
+            order by map index, and the maximum map index value.
+        """
+        from sqlalchemy import func, or_
+
+        from airflow.models.baseoperator import BaseOperator
+        from airflow.models.mappedoperator import MappedOperator
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.settings import task_instance_mutation_hook
+
+        if not isinstance(self, (BaseOperator, MappedOperator)):
+            raise RuntimeError(f"cannot expand unrecognized operator type 
{type(self).__name__}")
+
+        try:
+            total_length: int | None = self.get_mapped_ti_count(run_id, 
session=session)
+        except NotFullyPopulated as e:
+            # It's possible that the upstream tasks are not yet done, but we
+            # don't have upstream of upstreams in partial DAGs (possible in the

Review Comment:
   The context to this and most of the other comments you mentioned is #27506.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to