This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 22cb7410b5c Make sqlalchemy optional dependency for openlineage 
provider (#59921)
22cb7410b5c is described below

commit 22cb7410b5c6b668be808732f093da3450b74383
Author: Ankit Chaurasia <[email protected]>
AuthorDate: Wed Dec 31 21:01:10 2025 +0545

    Make sqlalchemy optional dependency for openlineage provider (#59921)
    
    * Make sqlalchemy optional dependency for openlineage provider
    
    * Refactor sqlalchemy imports to be lazy in openlineage utils
    
    * The function is already guarded by 'if not AIRFLOW_V_3_0_PLUS:' so the
    inner check for AIRFLOW_V_3_0_PLUS was unreachable dead code.
    
    * Remove sqlalchemy-specific ImportError handling in 
get_openlineage_facets_with_sql
---
 providers/openlineage/pyproject.toml                 |  7 +++++++
 .../src/airflow/providers/openlineage/utils/sql.py   | 18 +++++++++++++++++-
 .../src/airflow/providers/openlineage/utils/utils.py | 20 ++++++++------------
 3 files changed, 32 insertions(+), 13 deletions(-)

diff --git a/providers/openlineage/pyproject.toml 
b/providers/openlineage/pyproject.toml
index 1bec7f737fe..a3f6acded82 100644
--- a/providers/openlineage/pyproject.toml
+++ b/providers/openlineage/pyproject.toml
@@ -66,6 +66,13 @@ dependencies = [
     "openlineage-python>=1.41.0",
 ]
 
+# The optional dependencies should be modified in place in the generated file
+# Any change in the dependencies is preserved when the file is regenerated
+[project.optional-dependencies]
+"sqlalchemy" = [
+    "sqlalchemy>=1.4.49",
+]
+
 [dependency-groups]
 dev = [
     "apache-airflow",
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
index c7ebf05c3b6..903dc70a7c4 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py
@@ -25,9 +25,11 @@ from typing import TYPE_CHECKING
 from attrs import define
 from openlineage.client.event_v2 import Dataset
 from openlineage.client.facet_v2 import schema_dataset
-from sqlalchemy import Column, MetaData, Table, and_, or_, union_all
+
+from airflow.exceptions import AirflowOptionalProviderFeatureException
 
 if TYPE_CHECKING:
+    from sqlalchemy import Table
     from sqlalchemy.engine import Engine
     from sqlalchemy.sql.elements import ColumnElement
 
@@ -157,6 +159,13 @@ def create_information_schema_query(
     sqlalchemy_engine: Engine | None = None,
 ) -> str:
     """Create query for getting table schemas from information schema."""
+    try:
+        from sqlalchemy import Column, MetaData, Table, union_all
+    except ImportError:
+        raise AirflowOptionalProviderFeatureException(
+            "sqlalchemy is required for SQL schema query generation. "
+            "Install it with: pip install 
'apache-airflow-providers-openlineage[sqlalchemy]'"
+        )
     metadata = MetaData()
     select_statements = []
     # Don't iterate over tables hierarchy, just pass it to query single 
information schema table
@@ -217,6 +226,13 @@ def create_filter_clauses(
         therefore it is expected the table has them defined.
     :param uppercase_names: if True use schema and table names uppercase
     """
+    try:
+        from sqlalchemy import and_, or_
+    except ImportError:
+        raise AirflowOptionalProviderFeatureException(
+            "sqlalchemy is required for SQL filter clause generation. "
+            "Install it with: pip install 
'apache-airflow-providers-openlineage[sqlalchemy]'"
+        )
     table_schema_column_name = 
information_schema_table.columns[ColumnIndex.SCHEMA].name
     table_name_column_name = 
information_schema_table.columns[ColumnIndex.TABLE_NAME].name
     try:
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index ffd49462ef5..91ccfcd13b1 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -31,6 +31,7 @@ from openlineage.client.facet_v2 import parent_run
 from openlineage.client.utils import RedactMixin
 
 from airflow import __version__ as AIRFLOW_VERSION
+from airflow.exceptions import AirflowOptionalProviderFeatureException
 
 # TODO: move this maybe to Airflow's logic?
 from airflow.models import DagRun, TaskInstance, TaskReschedule
@@ -537,24 +538,19 @@ if not AIRFLOW_V_3_0_PLUS:
 
     @provide_session
     def is_ti_rescheduled_already(ti: TaskInstance, session=NEW_SESSION):
-        from sqlalchemy import exists, select
+        try:
+            from sqlalchemy import exists, select
+        except ImportError:
+            raise AirflowOptionalProviderFeatureException(
+                "sqlalchemy is required for checking task instance reschedule 
status. "
+                "Install it with: pip install 
'apache-airflow-providers-openlineage[sqlalchemy]'"
+            )
 
         if not isinstance(ti.task, BaseSensorOperator):
             return False
 
         if not ti.task.reschedule:
             return False
-        if AIRFLOW_V_3_0_PLUS:
-            return (
-                session.scalar(
-                    select(
-                        exists().where(
-                            TaskReschedule.ti_id == ti.id, 
TaskReschedule.try_number == ti.try_number
-                        )
-                    )
-                )
-                is True
-            )
         return (
             session.scalar(
                 select(

Reply via email to