This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e3428dd435e [OpenLineage] Added operator_provider_version to task
event (#52468)
e3428dd435e is described below
commit e3428dd435e7af0bf5cb1ca3bd55d311abc41d81
Author: Rahul Madan <[email protected]>
AuthorDate: Mon Jun 30 13:52:47 2025 +0530
[OpenLineage] Added operator_provider_version to task event (#52468)
* added another attribute containing the provider package version of the
operator being used.
Signed-off-by: Rahul Madan <[email protected]>
* precommit run
Signed-off-by: Rahul Madan <[email protected]>
---------
Signed-off-by: Rahul Madan <[email protected]>
---
.../airflow/providers/openlineage/utils/utils.py | 27 +++++++
.../tests/unit/openlineage/utils/test_utils.py | 85 ++++++++++++++++++++++
2 files changed, 112 insertions(+)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index afe41bcc318..9ca1ba427ee 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -128,6 +128,32 @@ def get_operator_class(task: BaseOperator) -> type:
return task.__class__
+def get_operator_provider_version(operator: BaseOperator | MappedOperator) ->
str | None:
+ """Get the provider package version for the given operator."""
+ try:
+ class_path = get_fully_qualified_class_name(operator)
+
+ if not class_path.startswith("airflow.providers."):
+ return None
+
+ from airflow.providers_manager import ProvidersManager
+
+ providers_manager = ProvidersManager()
+
+ for package_name, provider_info in providers_manager.providers.items():
+ if package_name.startswith("apache-airflow-providers-"):
+ provider_module_path = package_name.replace(
+ "apache-airflow-providers-", "airflow.providers."
+ ).replace("-", ".")
+ if class_path.startswith(provider_module_path + "."):
+ return provider_info.version
+
+ return None
+
+ except Exception:
+ return None
+
+
def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"
@@ -511,6 +537,7 @@ class TaskInfo(InfoJsonEncodable):
),
"inlets": lambda task: [AssetInfo(i) for i in task.inlets if
isinstance(i, Asset)],
"outlets": lambda task: [AssetInfo(o) for o in task.outlets if
isinstance(o, Asset)],
+ "operator_provider_version": lambda task:
get_operator_provider_version(task),
}
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 6357913e316..d3aea277e79 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -54,6 +54,7 @@ from airflow.providers.openlineage.utils.utils import (
get_fully_qualified_class_name,
get_job_name,
get_operator_class,
+ get_operator_provider_version,
get_user_provided_run_facets,
)
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -1600,6 +1601,7 @@ def test_task_info_af3():
"multiple_outputs": False,
"operator_class": "CustomOperator",
"operator_class_path": get_fully_qualified_class_name(task_10),
+ "operator_provider_version": None, # Custom operator doesn't have
provider version
"outlets": "[{'uri': 'uri2', 'extra': {'b': 2}}, {'uri': 'uri3',
'extra': {'c': 3}}]",
"owner": "airflow",
"priority_weight": 1,
@@ -1677,6 +1679,7 @@ def test_task_info_af2():
"multiple_outputs": False,
"operator_class": "CustomOperator",
"operator_class_path": get_fully_qualified_class_name(task_10),
+ "operator_provider_version": None, # Custom operator doesn't have
provider version
"outlets": "[{'uri': 'uri2', 'extra': {'b': 2}}, {'uri': 'uri3',
'extra': {'c': 3}}]",
"owner": "airflow",
"priority_weight": 1,
@@ -1698,3 +1701,85 @@ def test_task_info_complete():
task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
result = TaskInfoComplete(task_0)
assert "'bash_command': 'exit 0;'" in str(result)
+
+
+@patch("airflow.providers.openlineage.utils.utils.get_fully_qualified_class_name")
+def test_get_operator_provider_version_exception_handling(mock_class_name):
+ mock_class_name.side_effect = Exception("Test exception")
+ operator = MagicMock()
+ assert get_operator_provider_version(operator) is None
+
+
+def test_get_operator_provider_version_for_core_operator():
+ """Test that get_operator_provider_version returns None for core
operators."""
+ operator = BaseOperator(task_id="test_task")
+ result = get_operator_provider_version(operator)
+ assert result is None
+
+
+@patch("airflow.providers_manager.ProvidersManager")
+def
test_get_operator_provider_version_for_provider_operator(mock_providers_manager):
+ """Test that get_operator_provider_version returns version for provider
operators."""
+ # Mock ProvidersManager
+ mock_manager_instance = MagicMock()
+ mock_providers_manager.return_value = mock_manager_instance
+
+ # Mock providers data
+ mock_manager_instance.providers = {
+ "apache-airflow-providers-standard": MagicMock(version="1.2.0"),
+ "apache-airflow-providers-amazon": MagicMock(version="8.12.0"),
+ "apache-airflow-providers-google": MagicMock(version="10.5.0"),
+ }
+
+ # Test with BashOperator (standard provider)
+ operator = BashOperator(task_id="test_task", bash_command="echo test")
+ result = get_operator_provider_version(operator)
+ assert result == "1.2.0"
+
+
+@patch("airflow.providers_manager.ProvidersManager")
+def
test_get_operator_provider_version_provider_not_found(mock_providers_manager):
+ """Test that get_operator_provider_version returns None when provider is
not found."""
+ # Mock ProvidersManager with no matching provider
+ mock_manager_instance = MagicMock()
+ mock_providers_manager.return_value = mock_manager_instance
+ mock_manager_instance.providers = {
+ "apache-airflow-providers-amazon": MagicMock(version="8.12.0"),
+ "apache-airflow-providers-google": MagicMock(version="10.5.0"),
+ }
+
+ operator = BashOperator(task_id="test_task", bash_command="echo test")
+ result = get_operator_provider_version(operator)
+ assert result is None
+
+
+def test_get_operator_provider_version_for_custom_operator():
+ """Test that get_operator_provider_version returns None for custom
operators."""
+
+ # Create a custom operator that doesn't belong to any provider
+ class CustomOperator(BaseOperator):
+ def execute(self, context):
+ pass
+
+ operator = CustomOperator(task_id="test_task")
+ result = get_operator_provider_version(operator)
+ assert result is None
+
+
+@patch("airflow.providers_manager.ProvidersManager")
+def
test_get_operator_provider_version_for_mapped_operator(mock_providers_manager):
+ """Test that get_operator_provider_version works with mapped operators."""
+ # Mock ProvidersManager
+ mock_manager_instance = MagicMock()
+ mock_providers_manager.return_value = mock_manager_instance
+
+ # Mock providers data
+ mock_manager_instance.providers = {
+ "apache-airflow-providers-standard": MagicMock(version="1.2.0"),
+ "apache-airflow-providers-amazon": MagicMock(version="8.12.0"),
+ }
+
+ # Test with mapped BashOperator (standard provider)
+ mapped_operator =
BashOperator.partial(task_id="test_task").expand(bash_command=["echo 1", "echo
2"])
+ result = get_operator_provider_version(mapped_operator)
+ assert result == "1.2.0"