uranusjr commented on code in PR #31128:
URL: https://github.com/apache/airflow/pull/31128#discussion_r1222640989


##########
airflow/lineage/__init__.py:
##########
@@ -133,13 +133,20 @@ def wrapper(self, context, *args, **kwargs):
 
             # Remove auto and task_ids
             self.inlets = [i for i in self.inlets if not isinstance(i, str)]
-            _inlets = self.xcom_pull(context, task_ids=task_ids, 
dag_id=self.dag_id, key=PIPELINE_OUTLETS)
-
-            # re-instantiate the obtained inlets
-            # xcom_pull returns a list of items for each given task_id
-            _inlets = [item for item in itertools.chain.from_iterable(_inlets)]
-
-            self.inlets.extend(_inlets)
+            # We manually create a session here since xcom_pull returns a 
LazyXComAccess iterator.
+            # If we do not pass a session a new session will be created, 
however that session will not be
+            # properly closed and will remain open. After we are done 
iterating we can safely close this
+            # session.
+            with create_session() as session:
+                _inlets = self.xcom_pull(
+                    context, task_ids=task_ids, dag_id=self.dag_id, 
key=PIPELINE_OUTLETS, session=session
+                )

Review Comment:
   Maybe a comment above this block would help future readers understand the 
context better. 
   
   (Also I think the blank line + comments in lines 144-146 might actually hurt 
readability a bit since they make the two lines feel less associated. Maybe 
moving them above the entire block to make the `create_session`, `xcom_pull`, 
and`inlets.extend` lines stick close together would help as well.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to