This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a993db24f79 feat: Add helper for OpenLineage version check (#47897)
a993db24f79 is described below
commit a993db24f790784a0b43b8a33c28565ed0412d82
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Mar 18 11:46:00 2025 +0100
feat: Add helper for OpenLineage version check (#47897)
---
.../providers/common/compat/openlineage/check.py | 106 ++++++++
.../unit/common/compat/openlineage/test_check.py | 280 +++++++++++++++++++++
2 files changed, 386 insertions(+)
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/openlineage/check.py
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/check.py
new file mode 100644
index 00000000000..16626cb9ea5
--- /dev/null
+++
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/check.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import functools
+import logging
+from importlib import metadata
+
+from packaging.version import Version
+
+log = logging.getLogger(__name__)
+
+
+def require_openlineage_version(
+ provider_min_version: str | None = None, client_min_version: str | None =
None
+):
+ """
+ Enforce minimum version requirements for OpenLineage provider or client.
+
+ Some providers, such as Snowflake and DBT Cloud, do not require an
OpenLineage provider but may
+ offer optional features that depend on it. These features are generally
available starting
+ from a specific version of the OpenLineage provider or client. This
decorator helps ensure compatibility,
+ preventing import errors and providing clear logs about version
requirements.
+
+ Args:
+ provider_min_version: Optional minimum version requirement for
apache-airflow-providers-openlineage
+ client_min_version: Optional minimum version requirement for
openlineage-python
+
+ Raises:
+ ValueError: If neither `provider_min_version` nor `client_min_version`
is provided.
+ TypeError: If the decorator is used without parentheses (e.g.,
`@require_openlineage_version`).
+ """
+ err_msg = (
+ "`require_openlineage_version` decorator must be used with at least
one argument: "
+ "'provider_min_version' or 'client_min_version', "
+ 'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
+ )
+ # Detect if decorator is mistakenly used without arguments
+ if callable(provider_min_version) and client_min_version is None:
+ raise TypeError(err_msg)
+
+ # Ensure at least one argument is provided
+ if provider_min_version is None and client_min_version is None:
+ raise ValueError(err_msg)
+
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ if provider_min_version:
+ try:
+ provider_version: str =
metadata.version("apache-airflow-providers-openlineage")
+ except metadata.PackageNotFoundError:
+ try:
+ from airflow.providers.openlineage import __version__
as provider_version
+ except (ImportError, AttributeError, ModuleNotFoundError):
+ log.info(
+ "OpenLineage provider not found or has no version,
skipping function `%s` execution",
+ func.__name__,
+ )
+ return None
+
+ if provider_version and Version(provider_version) <
Version(provider_min_version):
+ log.info(
+ "OpenLineage provider version `%s` is lower than
required `%s`, skipping function `%s` execution",
+ provider_version,
+ provider_min_version,
+ func.__name__,
+ )
+ return None
+
+ if client_min_version:
+ try:
+ client_version: str =
metadata.version("openlineage-python")
+ except metadata.PackageNotFoundError:
+ log.info("OpenLineage client not found, skipping function
`%s` execution", func.__name__)
+ return None
+
+ if client_version and Version(client_version) <
Version(client_min_version):
+ log.info(
+ "OpenLineage client version `%s` is lower than
required `%s`, skipping function `%s` execution",
+ client_version,
+ client_min_version,
+ func.__name__,
+ )
+ return None
+
+ return func(*args, **kwargs)
+
+ return wrapper
+
+ return decorator
diff --git
a/providers/common/compat/tests/unit/common/compat/openlineage/test_check.py
b/providers/common/compat/tests/unit/common/compat/openlineage/test_check.py
new file mode 100644
index 00000000000..75937b3232b
--- /dev/null
+++ b/providers/common/compat/tests/unit/common/compat/openlineage/test_check.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+import sys
+import types
+from importlib import metadata
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.common.compat.openlineage.check import
require_openlineage_version
+
+
+def _mock_version(package):
+ if package == "apache-airflow-providers-openlineage":
+ return "1.0.0"
+ if package == "openlineage-python":
+ return "1.0.0"
+ raise Exception("Unexpected package")
+
+
+def test_decorator_without_arguments():
+ with pytest.raises(TypeError) as excinfo:
+
+ @require_openlineage_version # used without parentheses
+ def dummy():
+ return "result"
+
+ expected_error = (
+ "`require_openlineage_version` decorator must be used with at least
one argument: "
+ "'provider_min_version' or 'client_min_version', "
+ 'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
+ )
+ assert str(excinfo.value) == expected_error
+
+
+def test_decorator_without_arguments_with_parentheses():
+ with pytest.raises(ValueError) as excinfo:
+
+ @require_openlineage_version()
+ def dummy():
+ return "result"
+
+ expected_error = (
+ "`require_openlineage_version` decorator must be used with at least
one argument: "
+ "'provider_min_version' or 'client_min_version', "
+ 'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
+ )
+ assert str(excinfo.value) == expected_error
+
+
+def test_no_arguments_provided():
+ with pytest.raises(ValueError) as excinfo:
+ require_openlineage_version()
+ expected_error = (
+ "`require_openlineage_version` decorator must be used with at least
one argument: "
+ "'provider_min_version' or 'client_min_version', "
+ 'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
+ )
+ assert str(excinfo.value) == expected_error
+
+
[email protected]("provider_min_version", ("1.0.0", "0.9", "0",
"0.9.9", "1.0.0.dev0", "1.0.0rc1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_provider_version_sufficient(mock_version, caplog,
provider_min_version):
+ @require_openlineage_version(provider_min_version=provider_min_version)
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result == "result"
+ # No log messages about skipping should be emitted.
+ assert "skipping function" not in caplog.text
+
+
[email protected]("provider_min_version", ("1.1.0", "1.0.1.dev0",
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_provider_version_insufficient(mock_version, caplog,
provider_min_version):
+ @require_openlineage_version(provider_min_version=provider_min_version)
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result is None
+
+ expected_log = (
+ f"OpenLineage provider version `1.0.0` is lower than required
`{provider_min_version}`, "
+ "skipping function `dummy` execution"
+ )
+ assert expected_log in caplog.text
+
+
+def test_provider_not_found(caplog):
+ def fake_version(package):
+ if package == "apache-airflow-providers-openlineage":
+ raise metadata.PackageNotFoundError
+ raise Exception("Unexpected package")
+
+ with patch("importlib.metadata.version", side_effect=fake_version):
+ # Simulate that the fallback import returns a module without
__version__
+ dummy_module = types.ModuleType("airflow.providers.openlineage")
+ with patch.dict(sys.modules, {"airflow.providers.openlineage":
dummy_module}):
+
+ @require_openlineage_version(provider_min_version="1.0.0")
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result is None
+
+ expected_log = (
+ "OpenLineage provider not found or has no version, skipping
function `dummy` execution"
+ )
+ assert expected_log in caplog.text
+
+
+def test_provider_fallback_import(caplog):
+ def fake_version(package):
+ if package == "apache-airflow-providers-openlineage":
+ raise metadata.PackageNotFoundError
+ raise Exception("Unexpected package")
+
+ with patch("importlib.metadata.version", side_effect=fake_version):
+ # Simulate a module with a sufficient __version__
+ dummy_module = types.ModuleType("airflow.providers.openlineage")
+ dummy_module.__version__ = "1.2.0"
+ with patch.dict(sys.modules, {"airflow.providers.openlineage":
dummy_module}):
+
+ @require_openlineage_version(provider_min_version="1.0.0")
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result == "result"
+ assert "skipping function" not in caplog.text
+
+
[email protected]("client_min_version", ("1.0.0", "0.9", "0", "0.9.9",
"1.0.0.dev0", "1.0.0rc1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_client_version_sufficient(mock_version, caplog, client_min_version):
+ @require_openlineage_version(client_min_version=client_min_version)
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result == "result"
+ # No log messages about skipping should be emitted.
+ assert "skipping function" not in caplog.text
+
+
[email protected]("client_min_version", ("1.1.0", "1.0.1.dev0",
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_client_version_insufficient(mock_version, caplog, client_min_version):
+ @require_openlineage_version(client_min_version=client_min_version)
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result is None
+
+ expected_log = (
+ f"OpenLineage client version `1.0.0` is lower than required
`{client_min_version}`, "
+ "skipping function `dummy` execution"
+ )
+ assert expected_log in caplog.text
+
+
+def test_client_version_not_found(caplog):
+ def fake_version(package):
+ if package == "openlineage-python":
+ raise metadata.PackageNotFoundError
+ raise Exception("Unexpected package")
+
+ with patch("importlib.metadata.version", side_effect=fake_version):
+
+ @require_openlineage_version(client_min_version="1.0.0")
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result is None
+ expected_log = "OpenLineage client not found, skipping function
`dummy` execution"
+ assert expected_log in caplog.text
+
+
[email protected]("client_min_version", ("1.1.0", "1.0.1.dev0",
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_client_version_insufficient_when_both_passed(mock_version, caplog,
client_min_version):
+ @require_openlineage_version(provider_min_version="1.0.0",
client_min_version=client_min_version)
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result is None
+
+ expected_log = (
+ f"OpenLineage client version `1.0.0` is lower than required
`{client_min_version}`, "
+ "skipping function `dummy` execution"
+ )
+ assert expected_log in caplog.text
+
+
[email protected]("provider_min_version", ("1.1.0", "1.0.1.dev0",
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_provider_version_insufficient_when_both_passed(mock_version, caplog,
provider_min_version):
+ @require_openlineage_version(provider_min_version=provider_min_version,
client_min_version="1.0.0")
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result is None
+
+ expected_log = (
+ f"OpenLineage provider version `1.0.0` is lower than required
`{provider_min_version}`, "
+ "skipping function `dummy` execution"
+ )
+ assert expected_log in caplog.text
+
+
[email protected]("client_min_version", ("1.0.0", "0.9", "0", "0.9.9",
"1.0.0.dev0", "1.0.0rc1"))
[email protected]("provider_min_version", ("1.0.0", "0.9", "0",
"0.9.9", "1.0.0.dev0", "1.0.0rc1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_both_versions_sufficient(mock_version, caplog, provider_min_version,
client_min_version):
+ @require_openlineage_version(
+ provider_min_version=provider_min_version,
client_min_version=client_min_version
+ )
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result == "result"
+ assert "skipping function" not in caplog.text
+
+
[email protected]("client_min_version", ("1.1.0", "1.0.1.dev0",
"1.0.1rc1", "2", "1.1"))
[email protected]("provider_min_version", ("1.1.0", "1.0.1.dev0",
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_both_versions_insufficient(mock_version, caplog,
provider_min_version, client_min_version):
+ @require_openlineage_version(
+ provider_min_version=provider_min_version,
client_min_version=client_min_version
+ )
+ def dummy():
+ return "result"
+
+ caplog.set_level(logging.INFO)
+ result = dummy()
+ assert result is None
+
+ expected_log = (
+ f"OpenLineage provider version `1.0.0` is lower than required
`{provider_min_version}`, "
+ "skipping function `dummy` execution"
+ )
+ assert expected_log in caplog.text