uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r838813708



##########
File path: airflow/models/taskinstance.py
##########
@@ -819,17 +819,25 @@ def refresh_from_task(self, task: "Operator", 
pool_override=None):
         self.operator = task.task_type
 
     @provide_session
-    def clear_xcom_data(self, session=NEW_SESSION):
-        """
-        Clears all XCom data from the database for the task instance
+    def clear_xcom_data(self, session: Session = NEW_SESSION):
+        """Clear all XCom data from the database for the task instance.
+
+        If the task is unmapped, all XComs matching this task ID in the same 
DAG
+        run are removed. If the task is mapped, only the one with matching map
+        index is removed.
 
         :param session: SQLAlchemy ORM Session
         """
         self.log.debug("Clearing XCom data")
+        if self.map_index < 0:
+            map_index: Optional[int] = None
+        else:
+            map_index = self.map_index
         XCom.clear(
             dag_id=self.dag_id,
             task_id=self.task_id,
             run_id=self.run_id,
+            map_index=map_index,
             session=session,
         )

Review comment:
       Somewhat surprising we didn’t catch this earlier!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to