dstandish commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839170638
##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
.order_by(None)
.order_by(XCom.map_index.asc())
)
- return (XCom.deserialize_value(r) for r in query)
+
+ def iter_xcom_values(query):
+ # The session passed to xcom_pull() may die before this is
+ # iterated through, so we need to bind to a new session.
+ with create_session() as session:
Review comment:
why create a new one?
##########
File path: airflow/models/taskinstance.py
##########
@@ -819,17 +819,25 @@ def refresh_from_task(self, task: "Operator",
pool_override=None):
self.operator = task.task_type
@provide_session
- def clear_xcom_data(self, session=NEW_SESSION):
- """
- Clears all XCom data from the database for the task instance
+ def clear_xcom_data(self, session: Session = NEW_SESSION):
+ """Clear all XCom data from the database for the task instance.
+
+ If the task is unmapped, all XComs matching this task ID in the same
DAG
+ run are removed. If the task is mapped, only the one with matching map
+ index is removed.
:param session: SQLAlchemy ORM Session
"""
self.log.debug("Clearing XCom data")
+ if self.map_index < 0:
+ map_index: Optional[int] = None
Review comment:
why convert -1 to None?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]