stijndehaes commented on code in PR #31128:
URL: https://github.com/apache/airflow/pull/31128#discussion_r1222626996
##########
airflow/lineage/__init__.py:
##########
@@ -133,13 +133,20 @@ def wrapper(self, context, *args, **kwargs):
# Remove auto and task_ids
self.inlets = [i for i in self.inlets if not isinstance(i, str)]
- _inlets = self.xcom_pull(context, task_ids=task_ids,
dag_id=self.dag_id, key=PIPELINE_OUTLETS)
-
- # re-instantiate the obtained inlets
- # xcom_pull returns a list of items for each given task_id
- _inlets = [item for item in itertools.chain.from_iterable(_inlets)]
-
- self.inlets.extend(_inlets)
+ # We manually create a session here since xcom_pull returns a
LazyXComAccess iterator.
+ # If we do not pass a session a new session will be created,
however that session will not be
+ # properly closed and will remain open. After we are done
iterating we can safely close this
+ # session.
+ with create_session() as session:
+ _inlets = self.xcom_pull(
+ context, task_ids=task_ids, dag_id=self.dag_id,
key=PIPELINE_OUTLETS, session=session
+ )
Review Comment:
Removing the create_session will actually remove the bug fix. As explained
in the comment above the create session: the issue is that xcom_pull returns a
LazyXComAccess iterator. Since we have returned from the xcom_pull function,
the session that would have been automatically created there will be closed and
thus a new one that leaks will be started when we iterate. By wrapping this in
a manually created session the leak is fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]