This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cb31b654e32 Add OpenLineage support to JdbcHook (#67457)
cb31b654e32 is described below
commit cb31b654e3205b221a0abfb90048009060705861
Author: Rahul Madan <[email protected]>
AuthorDate: Wed May 27 16:09:10 2026 +0530
Add OpenLineage support to JdbcHook (#67457)
* Add OpenLineage support to JdbcHook
* Wire jdbc -> openlineage cross-provider dependency for CI tracking
* Address review: handle JDBC userinfo, Oracle service-name, and
case-insensitive prefix
Signed-off-by: Rahul Madan <[email protected]>
---------
Signed-off-by: Rahul Madan <[email protected]>
---
dev/breeze/tests/test_selective_checks.py | 4 +-
providers/jdbc/docs/index.rst | 1 +
providers/jdbc/pyproject.toml | 8 ++
.../jdbc/src/airflow/providers/jdbc/hooks/jdbc.py | 82 +++++++++++
providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py | 152 +++++++++++++++++++++
uv.lock | 9 ++
6 files changed, 254 insertions(+), 2 deletions(-)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index d8cbd2737a9..69fcf6fd054 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -2316,7 +2316,7 @@ def test_expected_output_push(
),
{
"selected-providers-list-as-string": "amazon common.compat
common.io common.sql "
- "databricks dbt.cloud ftp google microsoft.mssql mysql "
+ "databricks dbt.cloud ftp google jdbc microsoft.mssql mysql "
"openlineage oracle postgres sftp snowflake standard trino",
"all-python-versions":
f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string":
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
@@ -2335,7 +2335,7 @@ def test_expected_output_push(
{
"description": "amazon...standard",
"test_types": "Providers[amazon]
Providers[common.compat,common.io,common.sql,"
-
"databricks,dbt.cloud,ftp,microsoft.mssql,mysql,openlineage,oracle,"
+
"databricks,dbt.cloud,ftp,jdbc,microsoft.mssql,mysql,openlineage,oracle,"
"postgres,sftp,snowflake,trino] Providers[google]
Providers[standard]",
}
]
diff --git a/providers/jdbc/docs/index.rst b/providers/jdbc/docs/index.rst
index 16226f696ab..cc3cf7e44ea 100644
--- a/providers/jdbc/docs/index.rst
+++ b/providers/jdbc/docs/index.rst
@@ -134,6 +134,7 @@ Dependent package
==================================================================================================================
=================
`apache-airflow-providers-common-compat
<https://airflow.apache.org/docs/apache-airflow-providers-common-compat>`_
``common.compat``
`apache-airflow-providers-common-sql
<https://airflow.apache.org/docs/apache-airflow-providers-common-sql>`_
``common.sql``
+`apache-airflow-providers-openlineage
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage>`_
``openlineage``
==================================================================================================================
=================
Downloading official packages
diff --git a/providers/jdbc/pyproject.toml b/providers/jdbc/pyproject.toml
index b1be60ab866..60ab6f4940e 100644
--- a/providers/jdbc/pyproject.toml
+++ b/providers/jdbc/pyproject.toml
@@ -78,6 +78,13 @@ dependencies = [
"jpype1>=1.7.0; python_version >= '3.14'",
]
+# The optional dependencies should be modified in place in the generated file
+# Any change in the dependencies is preserved when the file is regenerated
+[project.optional-dependencies]
+"openlineage" = [
+ "apache-airflow-providers-openlineage"
+]
+
[dependency-groups]
dev = [
"apache-airflow",
@@ -85,6 +92,7 @@ dev = [
"apache-airflow-devel-common",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-common-sql",
+ "apache-airflow-providers-openlineage",
# Additional devel dependencies (do not remove this line and add extra
development dependencies)
]
diff --git a/providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py
b/providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py
index d286ea19607..a09376d4980 100644
--- a/providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py
+++ b/providers/jdbc/src/airflow/providers/jdbc/hooks/jdbc.py
@@ -262,3 +262,85 @@ class JdbcHook(DbApiHook):
uri = f"{uri}?{query_string}"
return uri
+
+ def get_openlineage_database_info(self, connection):
+ """
+ Return JDBC database information for OpenLineage.
+
+ Returns ``None`` when the database scheme cannot be inferred from the
+ connection's ``sqlalchemy_scheme`` extra or the JDBC URL in ``host``.
+ """
+ from airflow.providers.openlineage.sqlparser import DatabaseInfo
+
+ scheme = self._get_openlineage_scheme(connection)
+ if not scheme:
+ return None
+
+ return DatabaseInfo(
+ scheme=scheme,
+ authority=self._get_openlineage_authority(connection),
+ database=connection.schema or None,
+ )
+
+ def get_openlineage_database_dialect(self, connection) -> str:
+ """Return SQL dialect inferred from the JDBC connection, or
``generic``."""
+ return self._get_openlineage_scheme(connection) or "generic"
+
+ def get_openlineage_default_schema(self) -> str | None:
+ """Return default schema from the connection."""
+ return self.connection.schema or None
+
+ @staticmethod
+ def _get_openlineage_scheme(connection) -> str | None:
+ """Infer scheme from ``sqlalchemy_scheme`` extra or JDBC URL prefix."""
+ raw: str | None = None
+ sqlalchemy_scheme = connection.extra_dejson.get("sqlalchemy_scheme")
+ if sqlalchemy_scheme:
+ raw = sqlalchemy_scheme.split("+")[0]
+ else:
+ host = connection.host or ""
+ # The ``jdbc:`` prefix is case-insensitive per the JDBC spec.
+ if host.lower().startswith("jdbc:"):
+ jdbc_part = host[5:]
+ for sep in ("://", ":"):
+ if sep in jdbc_part:
+ candidate = jdbc_part.split(sep)[0].lower()
+ if candidate:
+ raw = candidate
+ break
+
+ if not raw:
+ return None
+ # Normalize SQLAlchemy's "postgresql" to OL's canonical "postgres" so
+ # JDBC-Postgres shares a namespace with PostgresHook downstream.
+ return "postgres" if raw == "postgresql" else raw
+
+ @staticmethod
+ def _get_openlineage_authority(connection) -> str | None:
+ """Extract ``host:port`` from a JDBC URL in ``host`` or from plain
``host``/``port``."""
+ host = connection.host or ""
+ # The ``jdbc:`` prefix is case-insensitive per the JDBC spec.
+ if host.lower().startswith("jdbc:"):
+ rest = host[5:]
+ # Oracle thin service-name format
(``jdbc:oracle:thin:@//host:1521/service``)
+ # uses ``:@//`` instead of ``://``. Handle before the ``://``
branch.
+ if "@//" in rest:
+ after = rest.split("@//", 1)[1]
+ elif "://" in rest:
+ after = rest.split("://", 1)[1]
+ else:
+ # Non-standard JDBC URL we can't reliably parse (e.g. Oracle
+ # SID format ``thin:@host:port:sid``, H2 ``mem:test``).
+ return None
+ # Strip path, query string, and driver-specific option separator
+ # (e.g. SQL Server uses ``;`` for connection properties).
+ for sep in ("/", "?", ";"):
+ after = after.split(sep, 1)[0]
+ # Strip userinfo (some drivers accept ``user:pass@host:port`` in
the
+ # URL); we never want credentials leaking into the OL namespace.
+ if "@" in after:
+ after = after.rsplit("@", 1)[-1]
+ return after or None
+ if connection.port and host:
+ return f"{host}:{connection.port}"
+ return host or None
diff --git a/providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py
b/providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py
index ff0f77b07bc..ff4fbe4733e 100644
--- a/providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py
+++ b/providers/jdbc/tests/unit/jdbc/hooks/test_jdbc.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import importlib.util
import json
import logging
import sqlite3
@@ -460,3 +461,154 @@ class TestJdbcHook:
jdbc_hook = get_hook(**hook_params)
assert jdbc_hook.get_uri() == expected_uri
+
+
+class TestJdbcHookOpenLineage:
+ """Static tests for the OpenLineage methods on JdbcHook."""
+
+ pytestmark = pytest.mark.skipif(
+ importlib.util.find_spec("airflow.providers.openlineage") is None,
+ reason="apache-airflow-providers-openlineage is not installed",
+ )
+
+ @pytest.mark.parametrize(
+ ("conn_params", "host", "port", "schema", "expected"),
+ [
+ # sqlalchemy_scheme extra (preferred path)
+ (
+ {"extra": json.dumps({"sqlalchemy_scheme": "postgresql"})},
+ "myhost",
+ 5432,
+ "mydb",
+ {
+ "scheme": "postgres", # normalized from postgresql
+ "authority": "myhost:5432",
+ "database": "mydb",
+ },
+ ),
+ # sqlalchemy_scheme with driver — driver suffix stripped
+ (
+ {"extra": json.dumps({"sqlalchemy_scheme":
"oracle+oracledb"})},
+ "oracle-host",
+ 1521,
+ "ORCL",
+ {
+ "scheme": "oracle",
+ "authority": "oracle-host:1521",
+ "database": "ORCL",
+ },
+ ),
+ # JDBC URL in host (no sqlalchemy_scheme extra)
+ (
+ {"extra": json.dumps({})},
+ "jdbc:mysql://mysql-host:3306/mydb",
+ None,
+ "mydb",
+ {
+ "scheme": "mysql",
+ "authority": "mysql-host:3306",
+ "database": "mydb",
+ },
+ ),
+ # sqlalchemy_scheme passthrough for non-normalized dialects
+ (
+ {"extra": json.dumps({"sqlalchemy_scheme": "snowflake"})},
+ "account.snowflakecomputing.com",
+ 443,
+ "WAREHOUSE_DB",
+ {
+ "scheme": "snowflake",
+ "authority": "account.snowflakecomputing.com:443",
+ "database": "WAREHOUSE_DB",
+ },
+ ),
+ ],
+ )
+ def test_get_openlineage_database_info_returns_expected_fields(
+ self, conn_params, host, port, schema, expected
+ ):
+ hook = get_hook(host=host, port=port, schema=schema,
conn_params=conn_params)
+ info =
hook.get_openlineage_database_info(hook.get_connection("jdbc_default"))
+ assert info is not None
+ assert info.scheme == expected["scheme"]
+ assert info.authority == expected["authority"]
+ assert info.database == expected["database"]
+
+ def
test_get_openlineage_database_info_returns_none_when_scheme_unknown(self):
+ """No sqlalchemy_scheme extra and host is not a JDBC URL — returns
None.
+
+ OL then skips dataset event emission rather than emit events with an
+ unidentifiable namespace.
+ """
+ hook = get_hook(host="plain-host", port=1234, conn_params={"extra":
json.dumps({})})
+ assert
hook.get_openlineage_database_info(hook.get_connection("jdbc_default")) is None
+
+ @pytest.mark.parametrize(
+ ("conn_params", "host", "expected_dialect"),
+ [
+ # sqlalchemy_scheme path with normalization
+ ({"extra": json.dumps({"sqlalchemy_scheme": "postgresql"})}, "h",
"postgres"),
+ ({"extra": json.dumps({"sqlalchemy_scheme": "mysql"})}, "h",
"mysql"),
+ ({"extra": json.dumps({"sqlalchemy_scheme": "oracle+oracledb"})},
"h", "oracle"),
+ # JDBC URL fallback
+ ({"extra": json.dumps({})}, "jdbc:trino://h:8080/c", "trino"),
+ ({"extra": json.dumps({})}, "jdbc:postgresql://h:5432/db",
"postgres"),
+ # ``jdbc:`` prefix is case-insensitive; extracted dialect is
lower-cased
+ ({"extra": json.dumps({})}, "JDBC:POSTGRESQL://h:5432/db",
"postgres"),
+ # neither path resolves -> generic fallback
+ ({"extra": json.dumps({})}, "plain-host", "generic"),
+ ({"extra": json.dumps({})}, "", "generic"),
+ ],
+ )
+ def test_get_openlineage_database_dialect(self, conn_params, host,
expected_dialect):
+ hook = get_hook(host=host, port=None, conn_params=conn_params)
+ dialect =
hook.get_openlineage_database_dialect(hook.get_connection("jdbc_default"))
+ assert dialect == expected_dialect
+
+ @pytest.mark.parametrize(
+ ("schema", "expected"),
+ [
+ ("mydb", "mydb"),
+ (None, None),
+ ("", None),
+ ],
+ )
+ def test_get_openlineage_default_schema(self, schema, expected):
+ hook = get_hook(schema=schema)
+ assert hook.get_openlineage_default_schema() == expected
+
+ @pytest.mark.parametrize(
+ ("host", "port", "expected_authority"),
+ [
+ # plain host + port (sqlalchemy-style setup)
+ ("myhost", 5432, "myhost:5432"),
+ # JDBC URL with embedded host:port
+ ("jdbc:postgresql://pg-host:5432/mydb", None, "pg-host:5432"),
+ # JDBC URL with query-string options after the database
+ ("jdbc:mysql://my-host:3306/mydb?useSSL=true", None,
"my-host:3306"),
+ # SQL Server uses ``;`` to separate connection properties
+ ("jdbc:sqlserver://sql-host:1433;databaseName=db", None,
"sql-host:1433"),
+ # plain host without port falls through to host-only
+ ("plain-host", None, "plain-host"),
+ # Non-standard JDBC URL with no ``://`` — unparsable, returns None
+ # (Oracle thin SID ``thin:@host:port:sid`` and H2 ``mem:test``
formats)
+ ("jdbc:oracle:thin:@host:1521:sid", None, None),
+ ("jdbc:h2:mem:test", None, None),
+ # Oracle thin service-name format uses ``@//`` instead of ``://``
+ ("jdbc:oracle:thin:@//ora-host:1521/service", None,
"ora-host:1521"),
+ # Userinfo embedded in URL (legal for several drivers) is stripped
+ # so credentials never leak into the OL namespace
+ ("jdbc:postgresql://user:pass@pg-host:5432/mydb", None,
"pg-host:5432"),
+ # ``jdbc:`` prefix is case-insensitive per the JDBC spec
+ ("JDBC:postgresql://pg-host:5432/mydb", None, "pg-host:5432"),
+ ],
+ )
+ def test_get_openlineage_authority_extraction(self, host, port,
expected_authority):
+ hook = get_hook(
+ host=host,
+ port=port,
+ conn_params={"extra": json.dumps({"sqlalchemy_scheme":
"postgresql"})},
+ )
+ info =
hook.get_openlineage_database_info(hook.get_connection("jdbc_default"))
+ assert info is not None
+ assert info.authority == expected_authority
diff --git a/uv.lock b/uv.lock
index 471df443b5a..e3dc3f8ad4f 100644
--- a/uv.lock
+++ b/uv.lock
@@ -5847,12 +5847,18 @@ dependencies = [
{ name = "jpype1" },
]
+[package.optional-dependencies]
+openlineage = [
+ { name = "apache-airflow-providers-openlineage" },
+]
+
[package.dev-dependencies]
dev = [
{ name = "apache-airflow" },
{ name = "apache-airflow-devel-common" },
{ name = "apache-airflow-providers-common-compat" },
{ name = "apache-airflow-providers-common-sql" },
+ { name = "apache-airflow-providers-openlineage" },
{ name = "apache-airflow-task-sdk" },
]
docs = [
@@ -5864,6 +5870,7 @@ requires-dist = [
{ name = "apache-airflow", editable = "." },
{ name = "apache-airflow-providers-common-compat", editable =
"providers/common/compat" },
{ name = "apache-airflow-providers-common-sql", editable =
"providers/common/sql" },
+ { name = "apache-airflow-providers-openlineage", marker = "extra ==
'openlineage'", editable = "providers/openlineage" },
{ name = "jaydebeapi", specifier = ">=1.1.1" },
{ name = "jpype1", marker = "python_full_version >= '3.14'", specifier =
">=1.7.0" },
{ name = "jpype1", marker = "python_full_version == '3.10.*' and
platform_machine == 'arm64' and sys_platform == 'darwin'", specifier =
">=1.4.0,!=1.7.0" },
@@ -5875,6 +5882,7 @@ requires-dist = [
{ name = "jpype1", marker = "(python_full_version == '3.12.*' and
platform_machine != 'arm64') or (python_full_version == '3.12.*' and
sys_platform != 'darwin')", specifier = ">=1.5.0" },
{ name = "jpype1", marker = "(python_full_version == '3.13.*' and
platform_machine != 'arm64') or (python_full_version == '3.13.*' and
sys_platform != 'darwin')", specifier = ">=1.5.1" },
]
+provides-extras = ["openlineage"]
[package.metadata.requires-dev]
dev = [
@@ -5882,6 +5890,7 @@ dev = [
{ name = "apache-airflow-devel-common", editable = "devel-common" },
{ name = "apache-airflow-providers-common-compat", editable =
"providers/common/compat" },
{ name = "apache-airflow-providers-common-sql", editable =
"providers/common/sql" },
+ { name = "apache-airflow-providers-openlineage", editable =
"providers/openlineage" },
{ name = "apache-airflow-task-sdk", editable = "task-sdk" },
]
docs = [{ name = "apache-airflow-devel-common", extras = ["docs"], editable =
"devel-common" }]