ephraimbuddy commented on code in PR #31128:
URL: https://github.com/apache/airflow/pull/31128#discussion_r1221071435


##########
airflow/lineage/__init__.py:
##########
@@ -133,13 +133,20 @@ def wrapper(self, context, *args, **kwargs):
 
             # Remove auto and task_ids
             self.inlets = [i for i in self.inlets if not isinstance(i, str)]
-            _inlets = self.xcom_pull(context, task_ids=task_ids, 
dag_id=self.dag_id, key=PIPELINE_OUTLETS)
-
-            # re-instantiate the obtained inlets
-            # xcom_pull returns a list of items for each given task_id
-            _inlets = [item for item in itertools.chain.from_iterable(_inlets)]
-
-            self.inlets.extend(_inlets)
+            # We manually create a session here since xcom_pull returns a 
LazyXComAccess iterator.
+            # If we do not pass a session a new session will be created, 
however that session will not be
+            # properly closed and will remain open. After we are done 
iterating we can safely close this
+            # session.
+            with create_session() as session:
+                _inlets = self.xcom_pull(
+                    context, task_ids=task_ids, dag_id=self.dag_id, 
key=PIPELINE_OUTLETS, session=session
+                )

Review Comment:
   ```suggestion
                _inlets = self.xcom_pull(
                       context, task_ids=task_ids, dag_id=self.dag_id, 
key=PIPELINE_OUTLETS
                   )
   ```
   The call to `create_session` is no longer needed since you decorated 
`xcom_pull` with `@provide_session`. The session will be automatically provided



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to