This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a6f859eb54 This fixes an issue were a database session got stuck 
(#31128)
a6f859eb54 is described below

commit a6f859eb540bcc168d99d66146a51902502c4217
Author: Stijn De Haes <[email protected]>
AuthorDate: Wed Jun 14 10:27:59 2023 +0200

    This fixes an issue were a database session got stuck (#31128)
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    Co-authored-by: Ephraim Anierobi <[email protected]>
    Co-authored-by: eladkal <[email protected]>
---
 airflow/lineage/__init__.py    | 17 ++++++++++-------
 airflow/models/baseoperator.py |  8 +++++++-
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py
index 173956b74c..22e7d82c09 100644
--- a/airflow/lineage/__init__.py
+++ b/airflow/lineage/__init__.py
@@ -25,11 +25,11 @@ from typing import TYPE_CHECKING, Any, Callable, TypeVar, 
cast
 
 from airflow.configuration import conf
 from airflow.lineage.backend import LineageBackend
+from airflow.utils.session import create_session
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
-
 PIPELINE_OUTLETS = "pipeline_outlets"
 PIPELINE_INLETS = "pipeline_inlets"
 AUTO = "auto"
@@ -133,13 +133,16 @@ def prepare_lineage(func: T) -> T:
 
             # Remove auto and task_ids
             self.inlets = [i for i in self.inlets if not isinstance(i, str)]
-            _inlets = self.xcom_pull(context, task_ids=task_ids, 
dag_id=self.dag_id, key=PIPELINE_OUTLETS)
-
-            # re-instantiate the obtained inlets
-            # xcom_pull returns a list of items for each given task_id
-            _inlets = [item for item in itertools.chain.from_iterable(_inlets)]
 
-            self.inlets.extend(_inlets)
+            # We manually create a session here since xcom_pull returns a 
LazyXComAccess iterator.
+            # If we do not pass a session a new session will be created, 
however that session will not be
+            # properly closed and will remain open. After we are done 
iterating we can safely close this
+            # session.
+            with create_session() as session:
+                _inlets = self.xcom_pull(
+                    context, task_ids=task_ids, dag_id=self.dag_id, 
key=PIPELINE_OUTLETS, session=session
+                )
+                self.inlets.extend(i for i in 
itertools.chain.from_iterable(_inlets))
 
         elif self.inlets:
             raise AttributeError("inlets is not a list, operator, string or 
attr annotated object")
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 6392ef9697..84aa9a17eb 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1477,12 +1477,14 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
         context["ti"].xcom_push(key=key, value=value, 
execution_date=execution_date)
 
     @staticmethod
+    @provide_session
     def xcom_pull(
         context: Any,
         task_ids: str | list[str] | None = None,
         dag_id: str | None = None,
         key: str = XCOM_RETURN_KEY,
         include_prior_dates: bool | None = None,
+        session: Session = NEW_SESSION,
     ) -> Any:
         """
         Pull XComs that optionally meet certain criteria.
@@ -1511,7 +1513,11 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
             are returned as well.
         """
         return context["ti"].xcom_pull(
-            key=key, task_ids=task_ids, dag_id=dag_id, 
include_prior_dates=include_prior_dates
+            key=key,
+            task_ids=task_ids,
+            dag_id=dag_id,
+            include_prior_dates=include_prior_dates,
+            session=session,
         )
 
     @classmethod

Reply via email to