This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 22cb7410b5c Make sqlalchemy optional dependency for openlineage
provider (#59921)
22cb7410b5c is described below
commit 22cb7410b5c6b668be808732f093da3450b74383
Author: Ankit Chaurasia <[email protected]>
AuthorDate: Wed Dec 31 21:01:10 2025 +0545
Make sqlalchemy optional dependency for openlineage provider (#59921)
* Make sqlalchemy optional dependency for openlineage provider
* Refactor sqlalchemy imports to be lazy in openlineage utils
* The function is already guarded by 'if not AIRFLOW_V_3_0_PLUS:' so the
inner check for AIRFLOW_V_3_0_PLUS was unreachable dead code.
* Remove sqlalchemy-specific ImportError handling in
get_openlineage_facets_with_sql
---
providers/openlineage/pyproject.toml | 7 +++++++
.../src/airflow/providers/openlineage/utils/sql.py | 18 +++++++++++++++++-
.../src/airflow/providers/openlineage/utils/utils.py | 20 ++++++++------------
3 files changed, 32 insertions(+), 13 deletions(-)
diff --git a/providers/openlineage/pyproject.toml
b/providers/openlineage/pyproject.toml
index 1bec7f737fe..a3f6acded82 100644
--- a/providers/openlineage/pyproject.toml
+++ b/providers/openlineage/pyproject.toml
@@ -66,6 +66,13 @@ dependencies = [
"openlineage-python>=1.41.0",
]
+# The optional dependencies should be modified in place in the generated file
+# Any change in the dependencies is preserved when the file is regenerated
+[project.optional-dependencies]
+"sqlalchemy" = [
+ "sqlalchemy>=1.4.49",
+]
+
[dependency-groups]
dev = [
"apache-airflow",
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
index c7ebf05c3b6..903dc70a7c4 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
@@ -25,9 +25,11 @@ from typing import TYPE_CHECKING
from attrs import define
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import schema_dataset
-from sqlalchemy import Column, MetaData, Table, and_, or_, union_all
+
+from airflow.exceptions import AirflowOptionalProviderFeatureException
if TYPE_CHECKING:
+ from sqlalchemy import Table
from sqlalchemy.engine import Engine
from sqlalchemy.sql.elements import ColumnElement
@@ -157,6 +159,13 @@ def create_information_schema_query(
sqlalchemy_engine: Engine | None = None,
) -> str:
"""Create query for getting table schemas from information schema."""
+ try:
+ from sqlalchemy import Column, MetaData, Table, union_all
+ except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "sqlalchemy is required for SQL schema query generation. "
+ "Install it with: pip install
'apache-airflow-providers-openlineage[sqlalchemy]'"
+ )
metadata = MetaData()
select_statements = []
# Don't iterate over tables hierarchy, just pass it to query single
information schema table
@@ -217,6 +226,13 @@ def create_filter_clauses(
therefore it is expected the table has them defined.
:param uppercase_names: if True use schema and table names uppercase
"""
+ try:
+ from sqlalchemy import and_, or_
+ except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "sqlalchemy is required for SQL filter clause generation. "
+ "Install it with: pip install
'apache-airflow-providers-openlineage[sqlalchemy]'"
+ )
table_schema_column_name =
information_schema_table.columns[ColumnIndex.SCHEMA].name
table_name_column_name =
information_schema_table.columns[ColumnIndex.TABLE_NAME].name
try:
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index ffd49462ef5..91ccfcd13b1 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -31,6 +31,7 @@ from openlineage.client.facet_v2 import parent_run
from openlineage.client.utils import RedactMixin
from airflow import __version__ as AIRFLOW_VERSION
+from airflow.exceptions import AirflowOptionalProviderFeatureException
# TODO: move this maybe to Airflow's logic?
from airflow.models import DagRun, TaskInstance, TaskReschedule
@@ -537,24 +538,19 @@ if not AIRFLOW_V_3_0_PLUS:
@provide_session
def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
- from sqlalchemy import exists, select
+ try:
+ from sqlalchemy import exists, select
+ except ImportError:
+ raise AirflowOptionalProviderFeatureException(
+ "sqlalchemy is required for checking task instance reschedule
status. "
+ "Install it with: pip install
'apache-airflow-providers-openlineage[sqlalchemy]'"
+ )
if not isinstance(ti.task, BaseSensorOperator):
return False
if not ti.task.reschedule:
return False
- if AIRFLOW_V_3_0_PLUS:
- return (
- session.scalar(
- select(
- exists().where(
- TaskReschedule.ti_id == ti.id,
TaskReschedule.try_number == ti.try_number
- )
- )
- )
- is True
- )
return (
session.scalar(
select(