This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a993db24f79 feat: Add helper for OpenLineage version check (#47897)
a993db24f79 is described below

commit a993db24f790784a0b43b8a33c28565ed0412d82
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Mar 18 11:46:00 2025 +0100

    feat: Add helper for OpenLineage version check (#47897)
---
 .../providers/common/compat/openlineage/check.py   | 106 ++++++++
 .../unit/common/compat/openlineage/test_check.py   | 280 +++++++++++++++++++++
 2 files changed, 386 insertions(+)

diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/openlineage/check.py
 
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/check.py
new file mode 100644
index 00000000000..16626cb9ea5
--- /dev/null
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/openlineage/check.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import functools
+import logging
+from importlib import metadata
+
+from packaging.version import Version
+
+log = logging.getLogger(__name__)
+
+
+def require_openlineage_version(
+    provider_min_version: str | None = None, client_min_version: str | None = 
None
+):
+    """
+    Enforce minimum version requirements for OpenLineage provider or client.
+
+    Some providers, such as Snowflake and DBT Cloud, do not require an 
OpenLineage provider but may
+    offer optional features that depend on it. These features are generally 
available starting
+    from a specific version of the OpenLineage provider or client. This 
decorator helps ensure compatibility,
+    preventing import errors and providing clear logs about version 
requirements.
+
+    Args:
+        provider_min_version: Optional minimum version requirement for 
apache-airflow-providers-openlineage
+        client_min_version: Optional minimum version requirement for 
openlineage-python
+
+    Raises:
+        ValueError: If neither `provider_min_version` nor `client_min_version` 
is provided.
+        TypeError: If the decorator is used without parentheses (e.g., 
`@require_openlineage_version`).
+    """
+    err_msg = (
+        "`require_openlineage_version` decorator must be used with at least 
one argument: "
+        "'provider_min_version' or 'client_min_version', "
+        'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
+    )
+    # Detect if decorator is mistakenly used without arguments
+    if callable(provider_min_version) and client_min_version is None:
+        raise TypeError(err_msg)
+
+    # Ensure at least one argument is provided
+    if provider_min_version is None and client_min_version is None:
+        raise ValueError(err_msg)
+
+    def decorator(func):
+        @functools.wraps(func)
+        def wrapper(*args, **kwargs):
+            if provider_min_version:
+                try:
+                    provider_version: str = 
metadata.version("apache-airflow-providers-openlineage")
+                except metadata.PackageNotFoundError:
+                    try:
+                        from airflow.providers.openlineage import __version__ 
as provider_version
+                    except (ImportError, AttributeError, ModuleNotFoundError):
+                        log.info(
+                            "OpenLineage provider not found or has no version, 
skipping function `%s` execution",
+                            func.__name__,
+                        )
+                        return None
+
+                if provider_version and Version(provider_version) < 
Version(provider_min_version):
+                    log.info(
+                        "OpenLineage provider version `%s` is lower than 
required `%s`, skipping function `%s` execution",
+                        provider_version,
+                        provider_min_version,
+                        func.__name__,
+                    )
+                    return None
+
+            if client_min_version:
+                try:
+                    client_version: str = 
metadata.version("openlineage-python")
+                except metadata.PackageNotFoundError:
+                    log.info("OpenLineage client not found, skipping function 
`%s` execution", func.__name__)
+                    return None
+
+                if client_version and Version(client_version) < 
Version(client_min_version):
+                    log.info(
+                        "OpenLineage client version `%s` is lower than 
required `%s`, skipping function `%s` execution",
+                        client_version,
+                        client_min_version,
+                        func.__name__,
+                    )
+                    return None
+
+            return func(*args, **kwargs)
+
+        return wrapper
+
+    return decorator
diff --git 
a/providers/common/compat/tests/unit/common/compat/openlineage/test_check.py 
b/providers/common/compat/tests/unit/common/compat/openlineage/test_check.py
new file mode 100644
index 00000000000..75937b3232b
--- /dev/null
+++ b/providers/common/compat/tests/unit/common/compat/openlineage/test_check.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+import sys
+import types
+from importlib import metadata
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.common.compat.openlineage.check import 
require_openlineage_version
+
+
+def _mock_version(package):
+    if package == "apache-airflow-providers-openlineage":
+        return "1.0.0"
+    if package == "openlineage-python":
+        return "1.0.0"
+    raise Exception("Unexpected package")
+
+
+def test_decorator_without_arguments():
+    with pytest.raises(TypeError) as excinfo:
+
+        @require_openlineage_version  # used without parentheses
+        def dummy():
+            return "result"
+
+    expected_error = (
+        "`require_openlineage_version` decorator must be used with at least 
one argument: "
+        "'provider_min_version' or 'client_min_version', "
+        'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
+    )
+    assert str(excinfo.value) == expected_error
+
+
+def test_decorator_without_arguments_with_parentheses():
+    with pytest.raises(ValueError) as excinfo:
+
+        @require_openlineage_version()
+        def dummy():
+            return "result"
+
+    expected_error = (
+        "`require_openlineage_version` decorator must be used with at least 
one argument: "
+        "'provider_min_version' or 'client_min_version', "
+        'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
+    )
+    assert str(excinfo.value) == expected_error
+
+
+def test_no_arguments_provided():
+    with pytest.raises(ValueError) as excinfo:
+        require_openlineage_version()
+    expected_error = (
+        "`require_openlineage_version` decorator must be used with at least 
one argument: "
+        "'provider_min_version' or 'client_min_version', "
+        'e.g., @require_openlineage_version(provider_min_version="1.0.0")'
+    )
+    assert str(excinfo.value) == expected_error
+
+
[email protected]("provider_min_version", ("1.0.0", "0.9", "0", 
"0.9.9", "1.0.0.dev0", "1.0.0rc1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_provider_version_sufficient(mock_version, caplog, 
provider_min_version):
+    @require_openlineage_version(provider_min_version=provider_min_version)
+    def dummy():
+        return "result"
+
+    caplog.set_level(logging.INFO)
+    result = dummy()
+    assert result == "result"
+    # No log messages about skipping should be emitted.
+    assert "skipping function" not in caplog.text
+
+
[email protected]("provider_min_version", ("1.1.0", "1.0.1.dev0", 
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_provider_version_insufficient(mock_version, caplog, 
provider_min_version):
+    @require_openlineage_version(provider_min_version=provider_min_version)
+    def dummy():
+        return "result"
+
+    caplog.set_level(logging.INFO)
+    result = dummy()
+    assert result is None
+
+    expected_log = (
+        f"OpenLineage provider version `1.0.0` is lower than required 
`{provider_min_version}`, "
+        "skipping function `dummy` execution"
+    )
+    assert expected_log in caplog.text
+
+
+def test_provider_not_found(caplog):
+    def fake_version(package):
+        if package == "apache-airflow-providers-openlineage":
+            raise metadata.PackageNotFoundError
+        raise Exception("Unexpected package")
+
+    with patch("importlib.metadata.version", side_effect=fake_version):
+        # Simulate that the fallback import returns a module without 
__version__
+        dummy_module = types.ModuleType("airflow.providers.openlineage")
+        with patch.dict(sys.modules, {"airflow.providers.openlineage": 
dummy_module}):
+
+            @require_openlineage_version(provider_min_version="1.0.0")
+            def dummy():
+                return "result"
+
+            caplog.set_level(logging.INFO)
+            result = dummy()
+            assert result is None
+
+            expected_log = (
+                "OpenLineage provider not found or has no version, skipping 
function `dummy` execution"
+            )
+            assert expected_log in caplog.text
+
+
+def test_provider_fallback_import(caplog):
+    def fake_version(package):
+        if package == "apache-airflow-providers-openlineage":
+            raise metadata.PackageNotFoundError
+        raise Exception("Unexpected package")
+
+    with patch("importlib.metadata.version", side_effect=fake_version):
+        # Simulate a module with a sufficient __version__
+        dummy_module = types.ModuleType("airflow.providers.openlineage")
+        dummy_module.__version__ = "1.2.0"
+        with patch.dict(sys.modules, {"airflow.providers.openlineage": 
dummy_module}):
+
+            @require_openlineage_version(provider_min_version="1.0.0")
+            def dummy():
+                return "result"
+
+            caplog.set_level(logging.INFO)
+            result = dummy()
+            assert result == "result"
+            assert "skipping function" not in caplog.text
+
+
[email protected]("client_min_version", ("1.0.0", "0.9", "0", "0.9.9", 
"1.0.0.dev0", "1.0.0rc1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_client_version_sufficient(mock_version, caplog, client_min_version):
+    @require_openlineage_version(client_min_version=client_min_version)
+    def dummy():
+        return "result"
+
+    caplog.set_level(logging.INFO)
+    result = dummy()
+    assert result == "result"
+    # No log messages about skipping should be emitted.
+    assert "skipping function" not in caplog.text
+
+
[email protected]("client_min_version", ("1.1.0", "1.0.1.dev0", 
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_client_version_insufficient(mock_version, caplog, client_min_version):
+    @require_openlineage_version(client_min_version=client_min_version)
+    def dummy():
+        return "result"
+
+    caplog.set_level(logging.INFO)
+    result = dummy()
+    assert result is None
+
+    expected_log = (
+        f"OpenLineage client version `1.0.0` is lower than required 
`{client_min_version}`, "
+        "skipping function `dummy` execution"
+    )
+    assert expected_log in caplog.text
+
+
+def test_client_version_not_found(caplog):
+    def fake_version(package):
+        if package == "openlineage-python":
+            raise metadata.PackageNotFoundError
+        raise Exception("Unexpected package")
+
+    with patch("importlib.metadata.version", side_effect=fake_version):
+
+        @require_openlineage_version(client_min_version="1.0.0")
+        def dummy():
+            return "result"
+
+        caplog.set_level(logging.INFO)
+        result = dummy()
+        assert result is None
+        expected_log = "OpenLineage client not found, skipping function 
`dummy` execution"
+        assert expected_log in caplog.text
+
+
[email protected]("client_min_version", ("1.1.0", "1.0.1.dev0", 
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_client_version_insufficient_when_both_passed(mock_version, caplog, 
client_min_version):
+    @require_openlineage_version(provider_min_version="1.0.0", 
client_min_version=client_min_version)
+    def dummy():
+        return "result"
+
+    caplog.set_level(logging.INFO)
+    result = dummy()
+    assert result is None
+
+    expected_log = (
+        f"OpenLineage client version `1.0.0` is lower than required 
`{client_min_version}`, "
+        "skipping function `dummy` execution"
+    )
+    assert expected_log in caplog.text
+
+
[email protected]("provider_min_version", ("1.1.0", "1.0.1.dev0", 
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_provider_version_insufficient_when_both_passed(mock_version, caplog, 
provider_min_version):
+    @require_openlineage_version(provider_min_version=provider_min_version, 
client_min_version="1.0.0")
+    def dummy():
+        return "result"
+
+    caplog.set_level(logging.INFO)
+    result = dummy()
+    assert result is None
+
+    expected_log = (
+        f"OpenLineage provider version `1.0.0` is lower than required 
`{provider_min_version}`, "
+        "skipping function `dummy` execution"
+    )
+    assert expected_log in caplog.text
+
+
[email protected]("client_min_version", ("1.0.0", "0.9", "0", "0.9.9", 
"1.0.0.dev0", "1.0.0rc1"))
[email protected]("provider_min_version", ("1.0.0", "0.9", "0", 
"0.9.9", "1.0.0.dev0", "1.0.0rc1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_both_versions_sufficient(mock_version, caplog, provider_min_version, 
client_min_version):
+    @require_openlineage_version(
+        provider_min_version=provider_min_version, 
client_min_version=client_min_version
+    )
+    def dummy():
+        return "result"
+
+    caplog.set_level(logging.INFO)
+    result = dummy()
+    assert result == "result"
+    assert "skipping function" not in caplog.text
+
+
[email protected]("client_min_version", ("1.1.0", "1.0.1.dev0", 
"1.0.1rc1", "2", "1.1"))
[email protected]("provider_min_version", ("1.1.0", "1.0.1.dev0", 
"1.0.1rc1", "2", "1.1"))
+@patch("importlib.metadata.version", side_effect=_mock_version)
+def test_both_versions_insufficient(mock_version, caplog, 
provider_min_version, client_min_version):
+    @require_openlineage_version(
+        provider_min_version=provider_min_version, 
client_min_version=client_min_version
+    )
+    def dummy():
+        return "result"
+
+    caplog.set_level(logging.INFO)
+    result = dummy()
+    assert result is None
+
+    expected_log = (
+        f"OpenLineage provider version `1.0.0` is lower than required 
`{provider_min_version}`, "
+        "skipping function `dummy` execution"
+    )
+    assert expected_log in caplog.text

Reply via email to