uranusjr commented on code in PR #23667:
URL: https://github.com/apache/airflow/pull/23667#discussion_r885299173


##########
airflow/models/dagrun.py:
##########
@@ -953,6 +972,37 @@ def expand_mapped_literals(task: "Operator") -> 
Tuple["Operator", Sequence[int]]
             # TODO[HA]: We probably need to savepoint this so we can keep the 
transaction alive.
             session.rollback()
 
+    def should_verify_integrity(self, dag, tis, *, session) -> bool:
+        """
+        Here we check if the length of the mapped task instances changed
+        at runtime. If so, we need to verify the integrity of the mapped
+        tasks.
+        """
+        existing_indexes: Dict["MappedOperator", list] = defaultdict(list)
+        new_indexes: Dict["MappedOperator", list] = defaultdict(list)
+        for ti in tis:
+            try:
+                ti.task = dag.get_task(ti.task_id)
+                task = ti.task
+            except TaskNotFound:
+                self.log.error("Failed to get task '%s' for dag '%s'. Marking 
it as removed.", ti, ti.dag_id)
+
+                ti.state = State.REMOVED
+                session.flush()
+                continue
+            if not task.is_mapped:
+                continue
+            # skip unexpanded tasks and also tasks that expands with literal 
arguments
+            if ti.map_index < 0 or task.parse_time_mapped_ti_count:
+                continue
+            existing_indexes[task].append(ti.map_index)
+            task.run_time_mapped_ti_count.cache_clear()
+            new_length = task.run_time_mapped_ti_count(self.run_id, 
session=session) or 0
+            new_indexes.update({task: list(range(new_length))})
+        missing_indexes: Dict["MappedOperator", list] = defaultdict(list)
+        [missing_indexes.update({k: list(set(new_indexes[k]) - set(v))}) for 
k, v in existing_indexes.items()]
+        return len(missing_indexes.values()) > 0

Review Comment:
   Since we need to calculate `missing_indexes` again in `verify_integrity`, 
some refactoring is likely available/needed. There seems to be some duplicated 
work done in the two functions, really.
   
   Setting `ti.state` in this function also seems to be a red flag; a function 
checking things should not have side effects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to