This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 11ff650e1b openlineage: disable running listener if not configured
(#33120)
11ff650e1b is described below
commit 11ff650e1b122aadebcea462adfae5492a76ed94
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Fri Aug 4 20:14:04 2023 +0200
openlineage: disable running listener if not configured (#33120)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
.../providers/openlineage/plugins/openlineage.py | 8 ++++-
.../openlineage/plugins/test_openlineage.py | 36 ++++++++++++++++++++--
2 files changed, 40 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/openlineage.py
b/airflow/providers/openlineage/plugins/openlineage.py
index 2ec0801147..318d2d5b17 100644
--- a/airflow/providers/openlineage/plugins/openlineage.py
+++ b/airflow/providers/openlineage/plugins/openlineage.py
@@ -27,6 +27,12 @@ def _is_disabled() -> bool:
return (
conf.getboolean("openlineage", "disabled")
or os.getenv("OPENLINEAGE_DISABLED", "false").lower() == "true"
+ or (
+ conf.get("openlineage", "transport") == ""
+ and conf.get("openlineage", "config_path") == ""
+ and os.getenv("OPENLINEAGE_URL", "") == ""
+ and os.getenv("OPENLINEAGE_CONFIG", "") == ""
+ )
)
@@ -39,8 +45,8 @@ class OpenLineageProviderPlugin(AirflowPlugin):
"""
name = "OpenLineageProviderPlugin"
- macros = [lineage_run_id, lineage_parent_id]
if not _is_disabled():
from airflow.providers.openlineage.plugins.listener import
OpenLineageListener
+ macros = [lineage_run_id, lineage_parent_id]
listeners = [OpenLineageListener()]
diff --git a/tests/providers/openlineage/plugins/test_openlineage.py
b/tests/providers/openlineage/plugins/test_openlineage.py
index 4fcb0f287d..fa41bc6aa7 100644
--- a/tests/providers/openlineage/plugins/test_openlineage.py
+++ b/tests/providers/openlineage/plugins/test_openlineage.py
@@ -39,8 +39,28 @@ class TestOpenLineageProviderPlugin:
@pytest.mark.parametrize(
"mocks, expected",
[
- ([patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"}, 0)], 0),
- ([conf_vars({("openlineage", "disabled"): "False"})], 1),
+ ([patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"})], 0),
+ (
+ [
+ conf_vars(
+ {("openlineage", "transport"): '{"type": "http",
"url": "http://localhost:5000"}'}
+ ),
+ patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"}),
+ ],
+ 0,
+ ),
+ ([patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "false"})], 0),
+ (
+ [
+ conf_vars(
+ {
+ ("openlineage", "disabled"): "False",
+ ("openlineage", "transport"): '{"type": "http",
"url": "http://localhost:5000"}',
+ }
+ )
+ ],
+ 1,
+ ),
(
[
conf_vars({("openlineage", "disabled"): "False"}),
@@ -48,7 +68,16 @@ class TestOpenLineageProviderPlugin:
],
0,
),
- ([], 1),
+ ([], 0),
+ ([patch.dict(os.environ, {"OPENLINEAGE_URL":
"http://localhost:8080"})], 1),
+ (
+ [
+ conf_vars(
+ {("openlineage", "transport"): '{"type": "http",
"url": "http://localhost:5000"}'}
+ )
+ ],
+ 1,
+ ),
],
)
def test_plugin_disablements(self, mocks, expected):
@@ -58,4 +87,5 @@ class TestOpenLineageProviderPlugin:
from airflow.providers.openlineage.plugins.openlineage import
OpenLineageProviderPlugin
plugin = OpenLineageProviderPlugin()
+
assert len(plugin.listeners) == expected