stijndehaes commented on code in PR #31128:
URL: https://github.com/apache/airflow/pull/31128#discussion_r1223062205
##########
airflow/lineage/__init__.py:
##########
@@ -133,13 +133,20 @@ def wrapper(self, context, *args, **kwargs):
# Remove auto and task_ids
self.inlets = [i for i in self.inlets if not isinstance(i, str)]
- _inlets = self.xcom_pull(context, task_ids=task_ids,
dag_id=self.dag_id, key=PIPELINE_OUTLETS)
-
- # re-instantiate the obtained inlets
- # xcom_pull returns a list of items for each given task_id
- _inlets = [item for item in itertools.chain.from_iterable(_inlets)]
-
- self.inlets.extend(_inlets)
+ # We manually create a session here since xcom_pull returns a
LazyXComAccess iterator.
+ # If we do not pass a session a new session will be created,
however that session will not be
+ # properly closed and will remain open. After we are done
iterating we can safely close this
+ # session.
+ with create_session() as session:
+ _inlets = self.xcom_pull(
+ context, task_ids=task_ids, dag_id=self.dag_id,
key=PIPELINE_OUTLETS, session=session
+ )
Review Comment:
I remove the empty lines and 2 comments. I don't think the added much here.
There already is a comment above the create_session block to explain things,
so I am not sure I correctly understand what you mean with `Maybe a comment
above this block would help future readers understand the context better.`. Can
you clarify it for me?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]