This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b30bfa34fae Remove global from openlineage provider (#58868)
b30bfa34fae is described below
commit b30bfa34fae8c7abf1a6dcf93ffc8b073e412c55
Author: Jens Scheffler <[email protected]>
AuthorDate: Mon Dec 1 22:29:26 2025 +0100
Remove global from openlineage provider (#58868)
* Remove global from openlineage provider
* Fix pytests based on global variable
---
.../unit/google/cloud/openlineage/test_utils.py | 37 ++++++---
.../unit/google/cloud/operators/test_dataproc.py | 90 ++++++++++++++--------
.../providers/openlineage/plugins/listener.py | 9 +--
.../tests/unit/openlineage/utils/test_spark.py | 31 +++++---
4 files changed, 109 insertions(+), 58 deletions(-)
diff --git a/providers/google/tests/unit/google/cloud/openlineage/test_utils.py
b/providers/google/tests/unit/google/cloud/openlineage/test_utils.py
index 73d1805c35d..e7bc9b22d9f 100644
--- a/providers/google/tests/unit/google/cloud/openlineage/test_utils.py
+++ b/providers/google/tests/unit/google/cloud/openlineage/test_utils.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import datetime as dt
import json
+from unittest import mock
from unittest.mock import MagicMock, patch
import pytest
@@ -624,13 +625,15 @@ def
test_inject_openlineage_properties_into_dataproc_job_parent_info_only(mock_i
assert result == {"sparkJob": {"properties": expected_properties}}
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_job_transport_info_only(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_properties = {
@@ -642,13 +645,15 @@ def
test_inject_openlineage_properties_into_dataproc_job_transport_info_only(
assert result == {"sparkJob": {"properties": expected_properties}}
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_job_all_injections(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_properties = {
@@ -899,13 +904,15 @@ def
test_inject_openlineage_properties_into_dataproc_batch_parent_info_only(mock
assert result == expected_batch
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_batch_transport_info_only(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_properties = {"existingProperty": "value",
**OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_SPARK_PROPERTIES}
@@ -917,13 +924,15 @@ def
test_inject_openlineage_properties_into_dataproc_batch_transport_info_only(
assert result == expected_batch
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_batch_all_injections(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_properties = {
@@ -1072,13 +1081,15 @@ def
test_inject_openlineage_properties_into_dataproc_workflow_template_parent_in
assert result == expected_template
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def
test_inject_openlineage_properties_into_dataproc_workflow_template_transport_info_only(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
template = {
@@ -1154,13 +1165,15 @@ def
test_inject_openlineage_properties_into_dataproc_workflow_template_transport
assert result == expected_template
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def
test_inject_openlineage_properties_into_dataproc_workflow_template_all_injections(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
template = {
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index 56b59009ba3..d525e97d5a5 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -1506,7 +1506,7 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_http_transport_info_injection(
@@ -1514,7 +1514,9 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
job_config = {
@@ -1561,7 +1563,7 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_all_info_injection(
@@ -1569,7 +1571,9 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
job_config = {
@@ -1617,7 +1621,7 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_unsupported_transport_info_injection(
@@ -1634,7 +1638,9 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
flush=True,
messageKey="some",
)
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= KafkaTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
KafkaTransport(
kafka_config
)
job_config = {
@@ -1710,14 +1716,16 @@ class
TestDataprocSubmitJobOperator(DataprocJobTestBase):
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def
test_execute_openlineage_transport_info_injection_skipped_when_already_present(
self, mock_hook, mock_ol_accessible, mock_ol_listener
):
mock_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
job_config = {
@@ -1793,14 +1801,16 @@ class
TestDataprocSubmitJobOperator(DataprocJobTestBase):
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def
test_execute_openlineage_transport_info_injection_skipped_by_default_unless_enabled(
self, mock_hook, mock_ol_accessible, mock_ol_listener
):
mock_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
job_config = {
@@ -1875,14 +1885,16 @@ class
TestDataprocSubmitJobOperator(DataprocJobTestBase):
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def
test_execute_openlineage_transport_info_injection_skipped_when_ol_not_accessible(
self, mock_hook, mock_ol_accessible, mock_ol_listener
):
mock_ol_accessible.return_value = False
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
job_config = {
@@ -2783,7 +2795,7 @@ class
TestDataprocWorkflowTemplateInstantiateInlineOperator:
)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_transport_info_injection(
@@ -2791,7 +2803,9 @@ class
TestDataprocWorkflowTemplateInstantiateInlineOperator:
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
@@ -2889,7 +2903,7 @@ class
TestDataprocWorkflowTemplateInstantiateInlineOperator:
)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_all_info_injection(
@@ -2897,7 +2911,9 @@ class
TestDataprocWorkflowTemplateInstantiateInlineOperator:
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
template = {
@@ -3017,14 +3033,16 @@ class
TestDataprocWorkflowTemplateInstantiateInlineOperator:
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def
test_execute_openlineage_transport_info_injection_skipped_by_default_unless_enabled(
self, mock_hook, mock_ol_accessible, mock_ol_listener
):
mock_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig("https://some-custom.url")
)
@@ -3065,14 +3083,16 @@ class
TestDataprocWorkflowTemplateInstantiateInlineOperator:
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def
test_execute_openlineage_transport_info_injection_skipped_when_ol_not_accessible(
self, mock_hook, mock_ol_accessible, mock_ol_listener
):
mock_ol_accessible.return_value = False
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig("https://some-custom.url")
)
@@ -3465,7 +3485,7 @@ class TestDataprocCreateBatchOperator:
@mock.patch.object(DataprocCreateBatchOperator, "log",
new_callable=mock.MagicMock)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3474,7 +3494,9 @@ class TestDataprocCreateBatchOperator:
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_batch = {
@@ -3522,7 +3544,7 @@ class TestDataprocCreateBatchOperator:
)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3531,7 +3553,9 @@ class TestDataprocCreateBatchOperator:
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_batch = {
@@ -3617,7 +3641,7 @@ class TestDataprocCreateBatchOperator:
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3625,7 +3649,9 @@ class TestDataprocCreateBatchOperator:
self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener
):
mock_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
batch = {
@@ -3703,7 +3729,7 @@ class TestDataprocCreateBatchOperator:
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3711,7 +3737,9 @@ class TestDataprocCreateBatchOperator:
self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener
):
mock_ol_accessible.return_value = True
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
batch = {
@@ -3783,7 +3811,7 @@ class TestDataprocCreateBatchOperator:
metadata=METADATA,
)
-
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3791,7 +3819,9 @@ class TestDataprocCreateBatchOperator:
self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener
):
mock_ol_accessible.return_value = False
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
batch = {
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 4124576eaec..5f7ed228070 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -21,6 +21,7 @@ import os
import sys
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
+from functools import cache
from typing import TYPE_CHECKING
import psutil
@@ -66,8 +67,6 @@ if sys.platform == "darwin":
else:
from setproctitle import getproctitle, setproctitle
-_openlineage_listener: OpenLineageListener | None = None
-
def _executor_initializer():
"""
@@ -806,9 +805,7 @@ class OpenLineageListener:
self.log.debug("Successfully submitted method to executor")
+@cache
def get_openlineage_listener() -> OpenLineageListener:
"""Get singleton listener manager."""
- global _openlineage_listener
- if not _openlineage_listener:
- _openlineage_listener = OpenLineageListener()
- return _openlineage_listener
+ return OpenLineageListener()
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
index eb910114789..3dadf68482d 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import datetime as dt
+from unittest import mock
from unittest.mock import MagicMock, patch
import pytest
@@ -110,16 +111,18 @@ def test_get_parent_job_information_as_spark_properties():
assert result == EXAMPLE_PARENT_JOB_SPARK_PROPERTIES
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
def test_get_transport_information_as_spark_properties(mock_ol_listener):
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(EXAMPLE_HTTP_TRANSPORT_CONFIG)
)
result = _get_transport_information_as_spark_properties()
assert result == EXAMPLE_TRANSPORT_SPARK_PROPERTIES
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
def
test_get_transport_information_as_spark_properties_unsupported_transport_type(mock_ol_listener):
kafka_config = KafkaConfig(
topic="my_topic",
@@ -131,16 +134,20 @@ def
test_get_transport_information_as_spark_properties_unsupported_transport_typ
flush=True,
messageKey="some",
)
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= KafkaTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
KafkaTransport(
kafka_config
)
result = _get_transport_information_as_spark_properties()
assert result == {}
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
def
test_get_transport_information_as_spark_properties_composite_transport_type(mock_ol_listener):
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= CompositeTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
CompositeTransport(
CompositeConfig.from_dict(
{"transports": {"http": EXAMPLE_HTTP_TRANSPORT_CONFIG, "kafka":
EXAMPLE_KAFKA_TRANSPORT_CONFIG}}
)
@@ -298,9 +305,11 @@ def
test_inject_parent_job_information_into_spark_properties(properties, should_
),
],
)
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
def test_inject_transport_information_into_spark_properties(mock_ol_listener,
properties, should_inject):
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= HttpTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
HttpTransport(
HttpConfig.from_dict(EXAMPLE_HTTP_TRANSPORT_CONFIG)
)
result = inject_transport_information_into_spark_properties(properties,
EXAMPLE_CONTEXT)
@@ -337,11 +346,13 @@ def
test_inject_transport_information_into_spark_properties(mock_ol_listener, pr
),
],
)
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
def test_inject_composite_transport_information_into_spark_properties(
mock_ol_listener, properties, should_inject
):
-
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
= CompositeTransport(
+ fake_listener = mock.MagicMock()
+ mock_ol_listener.return_value = fake_listener
+
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport =
CompositeTransport(
CompositeConfig(
transports={
"http": EXAMPLE_HTTP_TRANSPORT_CONFIG,