This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cb31b654e32 Add OpenLineage support to JdbcHook (#67457)
cb31b654e32 is described below

commit cb31b654e3205b221a0abfb90048009060705861
Author: Rahul Madan <[email protected]>
AuthorDate: Wed May 27 16:09:10 2026 +0530

    Add OpenLineage support to JdbcHook (#67457)
    
    * Add OpenLineage support to JdbcHook
    
    * Wire jdbc -> openlineage cross-provider dependency for CI tracking
    
    * Address review: handle JDBC userinfo, Oracle service-name, and 
case-insensitive prefix
    
    Signed-off-by: Rahul Madan <[email protected]>
    
    ---------
    
    Signed-off-by: Rahul Madan <[email protected]>
---
 dev/breeze/tests/test_selective_checks.py          |   4 +-
 providers/jdbc/docs/index.rst                      |   1 +
 providers/jdbc/pyproject.toml                      |   8 ++
 .../jdbc/src/airflow/providers/jdbc/hooks/jdbc.py  |  82 +++++++++++
 providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py  | 152 +++++++++++++++++++++
 uv.lock                                            |   9 ++
 6 files changed, 254 insertions(+), 2 deletions(-)

diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index d8cbd2737a9..69fcf6fd054 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -2316,7 +2316,7 @@ def test_expected_output_push(
             ),
             {
                 "selected-providers-list-as-string": "amazon common.compat 
common.io common.sql "
-                "databricks dbt.cloud ftp google microsoft.mssql mysql "
+                "databricks dbt.cloud ftp google jdbc microsoft.mssql mysql "
                 "openlineage oracle postgres sftp snowflake standard trino",
                 "all-python-versions": 
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
                 "all-python-versions-list-as-string": 
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
@@ -2335,7 +2335,7 @@ def test_expected_output_push(
                         {
                             "description": "amazon...standard",
                             "test_types": "Providers[amazon] 
Providers[common.compat,common.io,common.sql,"
-                            
"databricks,dbt.cloud,ftp,microsoft.mssql,mysql,openlineage,oracle,"
+                            
"databricks,dbt.cloud,ftp,jdbc,microsoft.mssql,mysql,openlineage,oracle,"
                             "postgres,sftp,snowflake,trino] Providers[google] 
Providers[standard]",
                         }
                     ]
diff --git a/providers/jdbc/docs/index.rst b/providers/jdbc/docs/index.rst
index 16226f696ab..cc3cf7e44ea 100644
--- a/providers/jdbc/docs/index.rst
+++ b/providers/jdbc/docs/index.rst
@@ -134,6 +134,7 @@ Dependent package
 
==================================================================================================================
  =================
 `apache-airflow-providers-common-compat 
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_  
``common.compat``
 `apache-airflow-providers-common-sql 
<https://airflow.apache.org/docs/apache-airflow-providers-common-sql>`_        
``common.sql``
+`apache-airflow-providers-openlineage 
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_      
``openlineage``
 
==================================================================================================================
  =================
 
 Downloading official packages
diff --git a/providers/jdbc/pyproject.toml b/providers/jdbc/pyproject.toml
index b1be60ab866..60ab6f4940e 100644
--- a/providers/jdbc/pyproject.toml
+++ b/providers/jdbc/pyproject.toml
@@ -78,6 +78,13 @@ dependencies = [
     "jpype1>=1.7.0; python_version >= '3.14'",
 ]
 
+# The optional dependencies should be modified in place in the generated file
+# Any change in the dependencies is preserved when the file is regenerated
+[project.optional-dependencies]
+"openlineage" = [
+    "apache-airflow-providers-openlineage"
+]
+
 [dependency-groups]
 dev = [
     "apache-airflow",
@@ -85,6 +92,7 @@ dev = [
     "apache-airflow-devel-common",
     "apache-airflow-providers-common-compat",
     "apache-airflow-providers-common-sql",
+    "apache-airflow-providers-openlineage",
     # Additional devel dependencies (do not remove this line and add extra 
development dependencies)
 ]
 
diff --git a/providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py 
b/providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py
index d286ea19607..a09376d4980 100644
--- a/providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py
+++ b/providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py
@@ -262,3 +262,85 @@ class JdbcHook(DbApiHook):
                 uri = f"{uri}?{query_string}"
 
         return uri
+
+    def get_openlineage_database_info(self, connection):
+        """
+        Return JDBC database information for OpenLineage.
+
+        Returns ``None`` when the database scheme cannot be inferred from the
+        connection's ``sqlalchemy_scheme`` extra or the JDBC URL in ``host``.
+        """
+        from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+        scheme = self._get_openlineage_scheme(connection)
+        if not scheme:
+            return None
+
+        return DatabaseInfo(
+            scheme=scheme,
+            authority=self._get_openlineage_authority(connection),
+            database=connection.schema or None,
+        )
+
+    def get_openlineage_database_dialect(self, connection) -> str:
+        """Return SQL dialect inferred from the JDBC connection, or 
``generic``."""
+        return self._get_openlineage_scheme(connection) or "generic"
+
+    def get_openlineage_default_schema(self) -> str | None:
+        """Return default schema from the connection."""
+        return self.connection.schema or None
+
+    @staticmethod
+    def _get_openlineage_scheme(connection) -> str | None:
+        """Infer scheme from ``sqlalchemy_scheme`` extra or JDBC URL prefix."""
+        raw: str | None = None
+        sqlalchemy_scheme = connection.extra_dejson.get("sqlalchemy_scheme")
+        if sqlalchemy_scheme:
+            raw = sqlalchemy_scheme.split("+")[0]
+        else:
+            host = connection.host or ""
+            # The ``jdbc:`` prefix is case-insensitive per the JDBC spec.
+            if host.lower().startswith("jdbc:"):
+                jdbc_part = host[5:]
+                for sep in ("://", ":"):
+                    if sep in jdbc_part:
+                        candidate = jdbc_part.split(sep)[0].lower()
+                        if candidate:
+                            raw = candidate
+                        break
+
+        if not raw:
+            return None
+        # Normalize SQLAlchemy's "postgresql" to OL's canonical "postgres" so
+        # JDBC-Postgres shares a namespace with PostgresHook downstream.
+        return "postgres" if raw == "postgresql" else raw
+
+    @staticmethod
+    def _get_openlineage_authority(connection) -> str | None:
+        """Extract ``host:port`` from a JDBC URL in ``host`` or from plain 
``host``/``port``."""
+        host = connection.host or ""
+        # The ``jdbc:`` prefix is case-insensitive per the JDBC spec.
+        if host.lower().startswith("jdbc:"):
+            rest = host[5:]
+            # Oracle thin service-name format 
(``jdbc:oracle:thin:@//host:1521/service``)
+            # uses ``:@//`` instead of ``://``. Handle before the ``://`` 
branch.
+            if "@//" in rest:
+                after = rest.split("@//", 1)[1]
+            elif "://" in rest:
+                after = rest.split("://", 1)[1]
+            else:
+                # Non-standard JDBC URL we can't reliably parse (e.g. Oracle
+                # SID format ``thin:@host:port:sid``, H2 ``mem:test``).
+                return None
+            # Strip path, query string, and driver-specific option separator
+            # (e.g. SQL Server uses ``;`` for connection properties).
+            for sep in ("/", "?", ";"):
+                after = after.split(sep, 1)[0]
+            # Strip userinfo (some drivers accept ``user:pass@host:port`` in 
the
+            # URL); we never want credentials leaking into the OL namespace.
+            if "@" in after:
+                after = after.rsplit("@", 1)[-1]
+            return after or None
+        if connection.port and host:
+            return f"{host}:{connection.port}"
+        return host or None
diff --git a/providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py 
b/providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py
index ff0f77b07bc..ff4fbe4733e 100644
--- a/providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py
+++ b/providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import importlib.util
 import json
 import logging
 import sqlite3
@@ -460,3 +461,154 @@ class TestJdbcHook:
 
         jdbc_hook = get_hook(**hook_params)
         assert jdbc_hook.get_uri() == expected_uri
+
+
+class TestJdbcHookOpenLineage:
+    """Static tests for the OpenLineage methods on JdbcHook."""
+
+    pytestmark = pytest.mark.skipif(
+        importlib.util.find_spec("airflow.providers.openlineage") is None,
+        reason="apache-airflow-providers-openlineage is not installed",
+    )
+
+    @pytest.mark.parametrize(
+        ("conn_params", "host", "port", "schema", "expected"),
+        [
+            # sqlalchemy_scheme extra (preferred path)
+            (
+                {"extra": json.dumps({"sqlalchemy_scheme": "postgresql"})},
+                "myhost",
+                5432,
+                "mydb",
+                {
+                    "scheme": "postgres",  # normalized from postgresql
+                    "authority": "myhost:5432",
+                    "database": "mydb",
+                },
+            ),
+            # sqlalchemy_scheme with driver — driver suffix stripped
+            (
+                {"extra": json.dumps({"sqlalchemy_scheme": 
"oracle+oracledb"})},
+                "oracle-host",
+                1521,
+                "ORCL",
+                {
+                    "scheme": "oracle",
+                    "authority": "oracle-host:1521",
+                    "database": "ORCL",
+                },
+            ),
+            # JDBC URL in host (no sqlalchemy_scheme extra)
+            (
+                {"extra": json.dumps({})},
+                "jdbc:mysql://mysql-host:3306/mydb",
+                None,
+                "mydb",
+                {
+                    "scheme": "mysql",
+                    "authority": "mysql-host:3306",
+                    "database": "mydb",
+                },
+            ),
+            # sqlalchemy_scheme passthrough for non-normalized dialects
+            (
+                {"extra": json.dumps({"sqlalchemy_scheme": "snowflake"})},
+                "account.snowflakecomputing.com",
+                443,
+                "WAREHOUSE_DB",
+                {
+                    "scheme": "snowflake",
+                    "authority": "account.snowflakecomputing.com:443",
+                    "database": "WAREHOUSE_DB",
+                },
+            ),
+        ],
+    )
+    def test_get_openlineage_database_info_returns_expected_fields(
+        self, conn_params, host, port, schema, expected
+    ):
+        hook = get_hook(host=host, port=port, schema=schema, 
conn_params=conn_params)
+        info = 
hook.get_openlineage_database_info(hook.get_connection("jdbc_default"))
+        assert info is not None
+        assert info.scheme == expected["scheme"]
+        assert info.authority == expected["authority"]
+        assert info.database == expected["database"]
+
+    def 
test_get_openlineage_database_info_returns_none_when_scheme_unknown(self):
+        """No sqlalchemy_scheme extra and host is not a JDBC URL — returns 
None.
+
+        OL then skips dataset event emission rather than emit events with an
+        unidentifiable namespace.
+        """
+        hook = get_hook(host="plain-host", port=1234, conn_params={"extra": 
json.dumps({})})
+        assert 
hook.get_openlineage_database_info(hook.get_connection("jdbc_default")) is None
+
+    @pytest.mark.parametrize(
+        ("conn_params", "host", "expected_dialect"),
+        [
+            # sqlalchemy_scheme path with normalization
+            ({"extra": json.dumps({"sqlalchemy_scheme": "postgresql"})}, "h", 
"postgres"),
+            ({"extra": json.dumps({"sqlalchemy_scheme": "mysql"})}, "h", 
"mysql"),
+            ({"extra": json.dumps({"sqlalchemy_scheme": "oracle+oracledb"})}, 
"h", "oracle"),
+            # JDBC URL fallback
+            ({"extra": json.dumps({})}, "jdbc:trino://h:8080/c", "trino"),
+            ({"extra": json.dumps({})}, "jdbc:postgresql://h:5432/db", 
"postgres"),
+            # ``jdbc:`` prefix is case-insensitive; extracted dialect is 
lower-cased
+            ({"extra": json.dumps({})}, "JDBC:POSTGRESQL://h:5432/db", 
"postgres"),
+            # neither path resolves -> generic fallback
+            ({"extra": json.dumps({})}, "plain-host", "generic"),
+            ({"extra": json.dumps({})}, "", "generic"),
+        ],
+    )
+    def test_get_openlineage_database_dialect(self, conn_params, host, 
expected_dialect):
+        hook = get_hook(host=host, port=None, conn_params=conn_params)
+        dialect = 
hook.get_openlineage_database_dialect(hook.get_connection("jdbc_default"))
+        assert dialect == expected_dialect
+
+    @pytest.mark.parametrize(
+        ("schema", "expected"),
+        [
+            ("mydb", "mydb"),
+            (None, None),
+            ("", None),
+        ],
+    )
+    def test_get_openlineage_default_schema(self, schema, expected):
+        hook = get_hook(schema=schema)
+        assert hook.get_openlineage_default_schema() == expected
+
+    @pytest.mark.parametrize(
+        ("host", "port", "expected_authority"),
+        [
+            # plain host + port (sqlalchemy-style setup)
+            ("myhost", 5432, "myhost:5432"),
+            # JDBC URL with embedded host:port
+            ("jdbc:postgresql://pg-host:5432/mydb", None, "pg-host:5432"),
+            # JDBC URL with query-string options after the database
+            ("jdbc:mysql://my-host:3306/mydb?useSSL=true", None, 
"my-host:3306"),
+            # SQL Server uses ``;`` to separate connection properties
+            ("jdbc:sqlserver://sql-host:1433;databaseName=db", None, 
"sql-host:1433"),
+            # plain host without port falls through to host-only
+            ("plain-host", None, "plain-host"),
+            # Non-standard JDBC URL with no ``://`` — unparsable, returns None
+            # (Oracle thin SID ``thin:@host:port:sid`` and H2 ``mem:test`` 
formats)
+            ("jdbc:oracle:thin:@host:1521:sid", None, None),
+            ("jdbc:h2:mem:test", None, None),
+            # Oracle thin service-name format uses ``@//`` instead of ``://``
+            ("jdbc:oracle:thin:@//ora-host:1521/service", None, 
"ora-host:1521"),
+            # Userinfo embedded in URL (legal for several drivers) is stripped
+            # so credentials never leak into the OL namespace
+            ("jdbc:postgresql://user:pass@pg-host:5432/mydb", None, 
"pg-host:5432"),
+            # ``jdbc:`` prefix is case-insensitive per the JDBC spec
+            ("JDBC:postgresql://pg-host:5432/mydb", None, "pg-host:5432"),
+        ],
+    )
+    def test_get_openlineage_authority_extraction(self, host, port, 
expected_authority):
+        hook = get_hook(
+            host=host,
+            port=port,
+            conn_params={"extra": json.dumps({"sqlalchemy_scheme": 
"postgresql"})},
+        )
+        info = 
hook.get_openlineage_database_info(hook.get_connection("jdbc_default"))
+        assert info is not None
+        assert info.authority == expected_authority
diff --git a/uv.lock b/uv.lock
index 471df443b5a..e3dc3f8ad4f 100644
--- a/uv.lock
+++ b/uv.lock
@@ -5847,12 +5847,18 @@ dependencies = [
     { name = "jpype1" },
 ]
 
+[package.optional-dependencies]
+openlineage = [
+    { name = "apache-airflow-providers-openlineage" },
+]
+
 [package.dev-dependencies]
 dev = [
     { name = "apache-airflow" },
     { name = "apache-airflow-devel-common" },
     { name = "apache-airflow-providers-common-compat" },
     { name = "apache-airflow-providers-common-sql" },
+    { name = "apache-airflow-providers-openlineage" },
     { name = "apache-airflow-task-sdk" },
 ]
 docs = [
@@ -5864,6 +5870,7 @@ requires-dist = [
     { name = "apache-airflow", editable = "." },
     { name = "apache-airflow-providers-common-compat", editable = 
"providers/common/compat" },
     { name = "apache-airflow-providers-common-sql", editable = 
"providers/common/sql" },
+    { name = "apache-airflow-providers-openlineage", marker = "extra == 
'openlineage'", editable = "providers/openlineage" },
     { name = "jaydebeapi", specifier = ">=1.1.1" },
     { name = "jpype1", marker = "python_full_version >= '3.14'", specifier = 
">=1.7.0" },
     { name = "jpype1", marker = "python_full_version == '3.10.*' and 
platform_machine == 'arm64' and sys_platform == 'darwin'", specifier = 
">=1.4.0,!=1.7.0" },
@@ -5875,6 +5882,7 @@ requires-dist = [
     { name = "jpype1", marker = "(python_full_version == '3.12.*' and 
platform_machine != 'arm64') or (python_full_version == '3.12.*' and 
sys_platform != 'darwin')", specifier = ">=1.5.0" },
     { name = "jpype1", marker = "(python_full_version == '3.13.*' and 
platform_machine != 'arm64') or (python_full_version == '3.13.*' and 
sys_platform != 'darwin')", specifier = ">=1.5.1" },
 ]
+provides-extras = ["openlineage"]
 
 [package.metadata.requires-dev]
 dev = [
@@ -5882,6 +5890,7 @@ dev = [
     { name = "apache-airflow-devel-common", editable = "devel-common" },
     { name = "apache-airflow-providers-common-compat", editable = 
"providers/common/compat" },
     { name = "apache-airflow-providers-common-sql", editable = 
"providers/common/sql" },
+    { name = "apache-airflow-providers-openlineage", editable = 
"providers/openlineage" },
     { name = "apache-airflow-task-sdk", editable = "task-sdk" },
 ]
 docs = [{ name = "apache-airflow-devel-common", extras = ["docs"], editable = 
"devel-common" }]

Reply via email to