uranusjr commented on code in PR #25757:
URL: https://github.com/apache/airflow/pull/25757#discussion_r947523883


##########
airflow/models/mappedoperator.py:
##########
@@ -682,17 +701,21 @@ def expand_mapped_task(self, run_id: str, *, session: 
Session) -> Tuple[Sequence
             ti.refresh_from_task(self)  # session.merge() loses task 
information.
             all_expanded_tis.append(ti)
 
+        # Coerce the None case to 0 -- these two are almost treated 
identically,
+        # except the unmapped ti (if exists) is marked to different states.
+        total_expanded_ti_count = total_length or 0
+
         # Set to "REMOVED" any (old) TaskInstances with map indices greater
         # than the current map value
         session.query(TaskInstance).filter(
             TaskInstance.dag_id == self.dag_id,
             TaskInstance.task_id == self.task_id,
             TaskInstance.run_id == run_id,
-            TaskInstance.map_index >= total_length,
+            TaskInstance.map_index >= total_expanded_ti_count,
         ).update({TaskInstance.state: TaskInstanceState.REMOVED})
 
         session.flush()
-        return all_expanded_tis, total_length
+        return all_expanded_tis, total_expanded_ti_count - 1

Review Comment:
   I think the old code introduces an off-by-one error? The return value is 
described as “the maximum map_index” (and is used as such in BackfillJob), 
which should be one less than the total expand length, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to