This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b30bfa34fae Remove global from openlineage provider (#58868)
b30bfa34fae is described below

commit b30bfa34fae8c7abf1a6dcf93ffc8b073e412c55
Author: Jens Scheffler <[email protected]>
AuthorDate: Mon Dec 1 22:29:26 2025 +0100

    Remove global from openlineage provider (#58868)
    
    * Remove global from openlineage provider
    
    * Fix pytests based on global variable
---
 .../unit/google/cloud/openlineage/test_utils.py    | 37 ++++++---
 .../unit/google/cloud/operators/test_dataproc.py   | 90 ++++++++++++++--------
 .../providers/openlineage/plugins/listener.py      |  9 +--
 .../tests/unit/openlineage/utils/test_spark.py     | 31 +++++---
 4 files changed, 109 insertions(+), 58 deletions(-)

diff --git a/providers/google/tests/unit/google/cloud/openlineage/test_utils.py 
b/providers/google/tests/unit/google/cloud/openlineage/test_utils.py
index 73d1805c35d..e7bc9b22d9f 100644
--- a/providers/google/tests/unit/google/cloud/openlineage/test_utils.py
+++ b/providers/google/tests/unit/google/cloud/openlineage/test_utils.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import datetime as dt
 import json
+from unittest import mock
 from unittest.mock import MagicMock, patch
 
 import pytest
@@ -624,13 +625,15 @@ def 
test_inject_openlineage_properties_into_dataproc_job_parent_info_only(mock_i
     assert result == {"sparkJob": {"properties": expected_properties}}
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
 def test_inject_openlineage_properties_into_dataproc_job_transport_info_only(
     mock_is_ol_accessible, mock_ol_listener
 ):
     mock_is_ol_accessible.return_value = True
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
         HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
     )
     expected_properties = {
@@ -642,13 +645,15 @@ def 
test_inject_openlineage_properties_into_dataproc_job_transport_info_only(
     assert result == {"sparkJob": {"properties": expected_properties}}
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
 def test_inject_openlineage_properties_into_dataproc_job_all_injections(
     mock_is_ol_accessible, mock_ol_listener
 ):
     mock_is_ol_accessible.return_value = True
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
         HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
     )
     expected_properties = {
@@ -899,13 +904,15 @@ def 
test_inject_openlineage_properties_into_dataproc_batch_parent_info_only(mock
     assert result == expected_batch
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
 def test_inject_openlineage_properties_into_dataproc_batch_transport_info_only(
     mock_is_ol_accessible, mock_ol_listener
 ):
     mock_is_ol_accessible.return_value = True
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
         HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
     )
     expected_properties = {"existingProperty": "value", 
**OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_SPARK_PROPERTIES}
@@ -917,13 +924,15 @@ def 
test_inject_openlineage_properties_into_dataproc_batch_transport_info_only(
     assert result == expected_batch
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
 def test_inject_openlineage_properties_into_dataproc_batch_all_injections(
     mock_is_ol_accessible, mock_ol_listener
 ):
     mock_is_ol_accessible.return_value = True
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
         HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
     )
     expected_properties = {
@@ -1072,13 +1081,15 @@ def 
test_inject_openlineage_properties_into_dataproc_workflow_template_parent_in
     assert result == expected_template
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
 def 
test_inject_openlineage_properties_into_dataproc_workflow_template_transport_info_only(
     mock_is_ol_accessible, mock_ol_listener
 ):
     mock_is_ol_accessible.return_value = True
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
         HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
     )
     template = {
@@ -1154,13 +1165,15 @@ def 
test_inject_openlineage_properties_into_dataproc_workflow_template_transport
     assert result == expected_template
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
 def 
test_inject_openlineage_properties_into_dataproc_workflow_template_all_injections(
     mock_is_ol_accessible, mock_ol_listener
 ):
     mock_is_ol_accessible.return_value = True
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
         HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
     )
     template = {
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py 
b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index 56b59009ba3..d525e97d5a5 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -1506,7 +1506,7 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
         )
 
     
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_http_transport_info_injection(
@@ -1514,7 +1514,9 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
     ):
         mock_ol_accessible.return_value = True
         mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         job_config = {
@@ -1561,7 +1563,7 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
         )
 
     
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_all_info_injection(
@@ -1569,7 +1571,9 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
     ):
         mock_ol_accessible.return_value = True
         mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         job_config = {
@@ -1617,7 +1621,7 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_unsupported_transport_info_injection(
@@ -1634,7 +1638,9 @@ class TestDataprocSubmitJobOperator(DataprocJobTestBase):
             flush=True,
             messageKey="some",
         )
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = KafkaTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
KafkaTransport(
             kafka_config
         )
         job_config = {
@@ -1710,14 +1716,16 @@ class 
TestDataprocSubmitJobOperator(DataprocJobTestBase):
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def 
test_execute_openlineage_transport_info_injection_skipped_when_already_present(
         self, mock_hook, mock_ol_accessible, mock_ol_listener
     ):
         mock_ol_accessible.return_value = True
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         job_config = {
@@ -1793,14 +1801,16 @@ class 
TestDataprocSubmitJobOperator(DataprocJobTestBase):
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def 
test_execute_openlineage_transport_info_injection_skipped_by_default_unless_enabled(
         self, mock_hook, mock_ol_accessible, mock_ol_listener
     ):
         mock_ol_accessible.return_value = True
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         job_config = {
@@ -1875,14 +1885,16 @@ class 
TestDataprocSubmitJobOperator(DataprocJobTestBase):
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def 
test_execute_openlineage_transport_info_injection_skipped_when_ol_not_accessible(
         self, mock_hook, mock_ol_accessible, mock_ol_listener
     ):
         mock_ol_accessible.return_value = False
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         job_config = {
@@ -2783,7 +2795,7 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
         )
 
     
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_transport_info_injection(
@@ -2791,7 +2803,9 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
     ):
         mock_ol_accessible.return_value = True
         mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
 
@@ -2889,7 +2903,7 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
         )
 
     
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_all_info_injection(
@@ -2897,7 +2911,9 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
     ):
         mock_ol_accessible.return_value = True
         mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         template = {
@@ -3017,14 +3033,16 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def 
test_execute_openlineage_transport_info_injection_skipped_by_default_unless_enabled(
         self, mock_hook, mock_ol_accessible, mock_ol_listener
     ):
         mock_ol_accessible.return_value = True
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig("https://some-custom.url";)
         )
 
@@ -3065,14 +3083,16 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def 
test_execute_openlineage_transport_info_injection_skipped_when_ol_not_accessible(
         self, mock_hook, mock_ol_accessible, mock_ol_listener
     ):
         mock_ol_accessible.return_value = False
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig("https://some-custom.url";)
         )
 
@@ -3465,7 +3485,7 @@ class TestDataprocCreateBatchOperator:
 
     @mock.patch.object(DataprocCreateBatchOperator, "log", 
new_callable=mock.MagicMock)
     
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3474,7 +3494,9 @@ class TestDataprocCreateBatchOperator:
     ):
         mock_ol_accessible.return_value = True
         mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         expected_batch = {
@@ -3522,7 +3544,7 @@ class TestDataprocCreateBatchOperator:
         )
 
     
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3531,7 +3553,9 @@ class TestDataprocCreateBatchOperator:
     ):
         mock_ol_accessible.return_value = True
         mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         expected_batch = {
@@ -3617,7 +3641,7 @@ class TestDataprocCreateBatchOperator:
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3625,7 +3649,9 @@ class TestDataprocCreateBatchOperator:
         self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener
     ):
         mock_ol_accessible.return_value = True
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         batch = {
@@ -3703,7 +3729,7 @@ class TestDataprocCreateBatchOperator:
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3711,7 +3737,9 @@ class TestDataprocCreateBatchOperator:
         self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener
     ):
         mock_ol_accessible.return_value = True
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         batch = {
@@ -3783,7 +3811,7 @@ class TestDataprocCreateBatchOperator:
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+    
@mock.patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3791,7 +3819,9 @@ class TestDataprocCreateBatchOperator:
         self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener
     ):
         mock_ol_accessible.return_value = False
-        
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+        fake_listener = mock.MagicMock()
+        mock_ol_listener.return_value = fake_listener
+        
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
             HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
         )
         batch = {
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 4124576eaec..5f7ed228070 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -21,6 +21,7 @@ import os
 import sys
 from concurrent.futures import ProcessPoolExecutor
 from datetime import datetime
+from functools import cache
 from typing import TYPE_CHECKING
 
 import psutil
@@ -66,8 +67,6 @@ if sys.platform == "darwin":
 else:
     from setproctitle import getproctitle, setproctitle
 
-_openlineage_listener: OpenLineageListener | None = None
-
 
 def _executor_initializer():
     """
@@ -806,9 +805,7 @@ class OpenLineageListener:
             self.log.debug("Successfully submitted method to executor")
 
 
+@cache
 def get_openlineage_listener() -> OpenLineageListener:
     """Get singleton listener manager."""
-    global _openlineage_listener
-    if not _openlineage_listener:
-        _openlineage_listener = OpenLineageListener()
-    return _openlineage_listener
+    return OpenLineageListener()
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
index eb910114789..3dadf68482d 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import datetime as dt
+from unittest import mock
 from unittest.mock import MagicMock, patch
 
 import pytest
@@ -110,16 +111,18 @@ def test_get_parent_job_information_as_spark_properties():
     assert result == EXAMPLE_PARENT_JOB_SPARK_PROPERTIES
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 def test_get_transport_information_as_spark_properties(mock_ol_listener):
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
         HttpConfig.from_dict(EXAMPLE_HTTP_TRANSPORT_CONFIG)
     )
     result = _get_transport_information_as_spark_properties()
     assert result == EXAMPLE_TRANSPORT_SPARK_PROPERTIES
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 def 
test_get_transport_information_as_spark_properties_unsupported_transport_type(mock_ol_listener):
     kafka_config = KafkaConfig(
         topic="my_topic",
@@ -131,16 +134,20 @@ def 
test_get_transport_information_as_spark_properties_unsupported_transport_typ
         flush=True,
         messageKey="some",
     )
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = KafkaTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
KafkaTransport(
         kafka_config
     )
     result = _get_transport_information_as_spark_properties()
     assert result == {}
 
 
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 def 
test_get_transport_information_as_spark_properties_composite_transport_type(mock_ol_listener):
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = CompositeTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
CompositeTransport(
         CompositeConfig.from_dict(
             {"transports": {"http": EXAMPLE_HTTP_TRANSPORT_CONFIG, "kafka": 
EXAMPLE_KAFKA_TRANSPORT_CONFIG}}
         )
@@ -298,9 +305,11 @@ def 
test_inject_parent_job_information_into_spark_properties(properties, should_
         ),
     ],
 )
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 def test_inject_transport_information_into_spark_properties(mock_ol_listener, 
properties, should_inject):
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = HttpTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
         HttpConfig.from_dict(EXAMPLE_HTTP_TRANSPORT_CONFIG)
     )
     result = inject_transport_information_into_spark_properties(properties, 
EXAMPLE_CONTEXT)
@@ -337,11 +346,13 @@ def 
test_inject_transport_information_into_spark_properties(mock_ol_listener, pr
         ),
     ],
 )
-@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
+@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
 def test_inject_composite_transport_information_into_spark_properties(
     mock_ol_listener, properties, should_inject
 ):
-    
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport
 = CompositeTransport(
+    fake_listener = mock.MagicMock()
+    mock_ol_listener.return_value = fake_listener
+    
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
CompositeTransport(
         CompositeConfig(
             transports={
                 "http": EXAMPLE_HTTP_TRANSPORT_CONFIG,

Reply via email to