This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2f570c2bf7 Fix when OpenLineage plugins has listener disabled. (#30708)
2f570c2bf7 is described below
commit 2f570c2bf7794e100e6960ba3abe0d6998c1e497
Author: JDarDagran <[email protected]>
AuthorDate: Thu Apr 20 17:13:50 2023 +0200
Fix when OpenLineage plugins has listener disabled. (#30708)
Add parametrized test for disabling OL listener in plugin.
Signed-off-by: Jakub Dardzinski <[email protected]>
---
.../providers/openlineage/extractors/manager.py | 4 +-
.../providers/openlineage/plugins/openlineage.py | 2 +-
.../openlineage/plugins/test_openlineage.py | 48 ++++++++++++++++++++++
3 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/openlineage/extractors/manager.py
b/airflow/providers/openlineage/extractors/manager.py
index edb21efd3a..9c577dd0f6 100644
--- a/airflow/providers/openlineage/extractors/manager.py
+++ b/airflow/providers/openlineage/extractors/manager.py
@@ -37,13 +37,13 @@ class ExtractorManager(LoggingMixin):
# Comma-separated extractors in OPENLINEAGE_EXTRACTORS variable.
# Extractors should implement BaseExtractor
- from airflow.providers.openlineage.utils import import_from_string
+ from airflow.utils.module_loading import import_string
# TODO: use airflow config with OL backup
env_extractors = os.getenv("OPENLINEAGE_EXTRACTORS")
if env_extractors is not None:
for extractor in env_extractors.split(";"):
- extractor: type[BaseExtractor] =
import_from_string(extractor.strip())
+ extractor: type[BaseExtractor] =
import_string(extractor.strip())
for operator_class in extractor.get_operator_classnames():
self.extractors[operator_class] = extractor
diff --git a/airflow/providers/openlineage/plugins/openlineage.py
b/airflow/providers/openlineage/plugins/openlineage.py
index a53c5cf4e1..6c32866005 100644
--- a/airflow/providers/openlineage/plugins/openlineage.py
+++ b/airflow/providers/openlineage/plugins/openlineage.py
@@ -33,7 +33,7 @@ class OpenLineageProviderPlugin(AirflowPlugin):
name = "OpenLineageProviderPlugin"
macros = [lineage_run_id, lineage_parent_id]
- if _is_disabled():
+ if not _is_disabled():
from airflow.providers.openlineage.plugins.listener import
OpenLineageListener
listeners = [OpenLineageListener()]
diff --git a/tests/providers/openlineage/plugins/test_openlineage.py
b/tests/providers/openlineage/plugins/test_openlineage.py
new file mode 100644
index 0000000000..064d73e2c3
--- /dev/null
+++ b/tests/providers/openlineage/plugins/test_openlineage.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import contextlib
+import os
+import sys
+from unittest.mock import patch
+
+import pytest
+
+
+class TestOpenLineageProviderPlugin:
+ def setup_method(self):
+ self.old_modules = dict(sys.modules)
+
+ def teardown_method(self):
+ # Remove any new modules imported during the test run. This lets us
+ # import the same source files for more than one test.
+ for mod in [m for m in sys.modules if m not in self.old_modules]:
+ del sys.modules[mod]
+
+ @pytest.mark.parametrize(
+ "mocks, expected",
+ [([patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"}, 0)], 0),
([], 1)],
+ )
+ def test_plugin_disablements(self, mocks, expected):
+ with contextlib.ExitStack() as stack:
+ for mock in mocks:
+ stack.enter_context(mock)
+ from airflow.providers.openlineage.plugins.openlineage import
OpenLineageProviderPlugin
+
+ plugin = OpenLineageProviderPlugin()
+ assert len(plugin.listeners) == expected