This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a6f859eb54 This fixes an issue were a database session got stuck
(#31128)
a6f859eb54 is described below
commit a6f859eb540bcc168d99d66146a51902502c4217
Author: Stijn De Haes <[email protected]>
AuthorDate: Wed Jun 14 10:27:59 2023 +0200
This fixes an issue were a database session got stuck (#31128)
Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: Ephraim Anierobi <[email protected]>
Co-authored-by: eladkal <[email protected]>
---
airflow/lineage/__init__.py | 17 ++++++++++-------
airflow/models/baseoperator.py | 8 +++++++-
2 files changed, 17 insertions(+), 8 deletions(-)
diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py
index 173956b74c..22e7d82c09 100644
--- a/airflow/lineage/__init__.py
+++ b/airflow/lineage/__init__.py
@@ -25,11 +25,11 @@ from typing import TYPE_CHECKING, Any, Callable, TypeVar,
cast
from airflow.configuration import conf
from airflow.lineage.backend import LineageBackend
+from airflow.utils.session import create_session
if TYPE_CHECKING:
from airflow.utils.context import Context
-
PIPELINE_OUTLETS = "pipeline_outlets"
PIPELINE_INLETS = "pipeline_inlets"
AUTO = "auto"
@@ -133,13 +133,16 @@ def prepare_lineage(func: T) -> T:
# Remove auto and task_ids
self.inlets = [i for i in self.inlets if not isinstance(i, str)]
- _inlets = self.xcom_pull(context, task_ids=task_ids,
dag_id=self.dag_id, key=PIPELINE_OUTLETS)
-
- # re-instantiate the obtained inlets
- # xcom_pull returns a list of items for each given task_id
- _inlets = [item for item in itertools.chain.from_iterable(_inlets)]
- self.inlets.extend(_inlets)
+ # We manually create a session here since xcom_pull returns a
LazyXComAccess iterator.
+ # If we do not pass a session a new session will be created,
however that session will not be
+ # properly closed and will remain open. After we are done
iterating we can safely close this
+ # session.
+ with create_session() as session:
+ _inlets = self.xcom_pull(
+ context, task_ids=task_ids, dag_id=self.dag_id,
key=PIPELINE_OUTLETS, session=session
+ )
+ self.inlets.extend(i for i in
itertools.chain.from_iterable(_inlets))
elif self.inlets:
raise AttributeError("inlets is not a list, operator, string or
attr annotated object")
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 6392ef9697..84aa9a17eb 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1477,12 +1477,14 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
context["ti"].xcom_push(key=key, value=value,
execution_date=execution_date)
@staticmethod
+ @provide_session
def xcom_pull(
context: Any,
task_ids: str | list[str] | None = None,
dag_id: str | None = None,
key: str = XCOM_RETURN_KEY,
include_prior_dates: bool | None = None,
+ session: Session = NEW_SESSION,
) -> Any:
"""
Pull XComs that optionally meet certain criteria.
@@ -1511,7 +1513,11 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
are returned as well.
"""
return context["ti"].xcom_pull(
- key=key, task_ids=task_ids, dag_id=dag_id,
include_prior_dates=include_prior_dates
+ key=key,
+ task_ids=task_ids,
+ dag_id=dag_id,
+ include_prior_dates=include_prior_dates,
+ session=session,
)
@classmethod