This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-disable-when-not-configured in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 22c37309087a3c2c14f78c5af6296a8fd5499414 Author: Maciej Obuchowski <[email protected]> AuthorDate: Fri Aug 4 18:45:22 2023 +0200 openlineage: disable running listener if not configured Signed-off-by: Maciej Obuchowski <[email protected]> --- .../providers/openlineage/plugins/openlineage.py | 8 +++- .../openlineage/plugins/test_openlineage.py | 51 ++++++++++++++++++++-- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/airflow/providers/openlineage/plugins/openlineage.py b/airflow/providers/openlineage/plugins/openlineage.py index 2ec0801147..318d2d5b17 100644 --- a/airflow/providers/openlineage/plugins/openlineage.py +++ b/airflow/providers/openlineage/plugins/openlineage.py @@ -27,6 +27,12 @@ def _is_disabled() -> bool: return ( conf.getboolean("openlineage", "disabled") or os.getenv("OPENLINEAGE_DISABLED", "false").lower() == "true" + or ( + conf.get("openlineage", "transport") == "" + and conf.get("openlineage", "config_path") == "" + and os.getenv("OPENLINEAGE_URL", "") == "" + and os.getenv("OPENLINEAGE_CONFIG", "") == "" + ) ) @@ -39,8 +45,8 @@ class OpenLineageProviderPlugin(AirflowPlugin): """ name = "OpenLineageProviderPlugin" - macros = [lineage_run_id, lineage_parent_id] if not _is_disabled(): from airflow.providers.openlineage.plugins.listener import OpenLineageListener + macros = [lineage_run_id, lineage_parent_id] listeners = [OpenLineageListener()] diff --git a/tests/providers/openlineage/plugins/test_openlineage.py b/tests/providers/openlineage/plugins/test_openlineage.py index 4fcb0f287d..832a76f641 100644 --- a/tests/providers/openlineage/plugins/test_openlineage.py +++ b/tests/providers/openlineage/plugins/test_openlineage.py @@ -17,6 +17,7 @@ from __future__ import annotations import contextlib +import logging import os import sys from unittest.mock import patch @@ -39,8 +40,28 @@ class TestOpenLineageProviderPlugin: @pytest.mark.parametrize( "mocks, expected", [ - ([patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"}, 0)], 0), - ([conf_vars({("openlineage", "disabled"): "False"})], 1), + ([patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"})], 0), + ( + [ + conf_vars( + {("openlineage", "transport"): '{"type": "http", "url": "http://localhost:5000"}'} + ), + patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"}), + ], + 0, + ), + ([patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "false"})], 0), + ( + [ + conf_vars( + { + ("openlineage", "disabled"): "False", + ("openlineage", "transport"): '{"type": "http", "url": "http://localhost:5000"}', + } + ) + ], + 1, + ), ( [ conf_vars({("openlineage", "disabled"): "False"}), @@ -48,14 +69,36 @@ class TestOpenLineageProviderPlugin: ], 0, ), - ([], 1), + ([], 0), + ([patch.dict(os.environ, {"OPENLINEAGE_URL": "http://localhost:8080"})], 1), + ( + [ + conf_vars( + {("openlineage", "transport"): '{"type": "http", "url": "http://localhost:5000"}'} + ) + ], + 1, + ), ], ) def test_plugin_disablements(self, mocks, expected): with contextlib.ExitStack() as stack: for mock in mocks: stack.enter_context(mock) - from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin + from airflow.providers.openlineage.plugins.openlineage import ( + OpenLineageProviderPlugin, + _is_disabled, + ) plugin = OpenLineageProviderPlugin() + + log = logging.getLogger("airflow") + import json + + log.error(json.dumps(dict(os.environ), sort_keys=True)) + log.error(os.getenv("OPENLINEAGE_URL", "")) + log.error(os.getenv("OPENLINEAGE_DISABLED", "")) + log.error(expected) + log.error(_is_disabled()) + assert len(plugin.listeners) == expected
