This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8bc6c32366 chore: Add more OpenLineage logs to facilitate debugging
(#39136)
8bc6c32366 is described below
commit 8bc6c32366e723c897c0c4be3b3026c61314b519
Author: Kacper Muda <[email protected]>
AuthorDate: Mon May 13 18:54:48 2024 +0200
chore: Add more OpenLineage logs to facilitate debugging (#39136)
Signed-off-by: Kacper Muda <[email protected]>
---
airflow/providers/openlineage/conf.py | 17 +++---
airflow/providers/openlineage/extractors/base.py | 7 +++
airflow/providers/openlineage/extractors/bash.py | 4 ++
.../providers/openlineage/extractors/manager.py | 8 ++-
airflow/providers/openlineage/extractors/python.py | 5 ++
airflow/providers/openlineage/plugins/adapter.py | 17 +++++-
airflow/providers/openlineage/plugins/listener.py | 59 ++++++++++++++++--
airflow/providers/openlineage/provider.yaml | 2 +-
tests/providers/openlineage/test_conf.py | 69 ++++++++++++++++++++++
9 files changed, 172 insertions(+), 16 deletions(-)
diff --git a/airflow/providers/openlineage/conf.py
b/airflow/providers/openlineage/conf.py
index 4ca42eedfd..d43806abca 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -26,6 +26,10 @@ from airflow.configuration import conf
_CONFIG_SECTION = "openlineage"
+def _is_true(arg: Any) -> bool:
+ return str(arg).lower().strip() in ("true", "1", "t")
+
+
@cache
def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
@@ -41,7 +45,8 @@ def is_source_enabled() -> bool:
option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "")
- return option.lower() not in ("true", "1", "t")
+ # when disable_source_code is True, is_source_enabled() should be False
+ return not _is_true(option)
@cache
@@ -53,7 +58,9 @@ def disabled_operators() -> set[str]:
@cache
def selective_enable() -> bool:
- return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False)
+ """[openlineage] selective_enable."""
+ option = conf.get(_CONFIG_SECTION, "selective_enable", fallback="")
+ return _is_true(option)
@cache
@@ -85,11 +92,7 @@ def transport() -> dict[str, Any]:
@cache
def is_disabled() -> bool:
- """[openlineage] disabled + some extra checks."""
-
- def _is_true(val):
- return str(val).lower().strip() in ("true", "1", "t")
-
+ """[openlineage] disabled + check if any configuration is present."""
option = conf.get(_CONFIG_SECTION, "disabled", fallback="")
if _is_true(option):
return True
diff --git a/airflow/providers/openlineage/extractors/base.py
b/airflow/providers/openlineage/extractors/base.py
index 2f5f957b5b..6fa805b6fb 100644
--- a/airflow/providers/openlineage/extractors/base.py
+++ b/airflow/providers/openlineage/extractors/base.py
@@ -87,6 +87,9 @@ class DefaultExtractor(BaseExtractor):
def _execute_extraction(self) -> OperatorLineage | None:
# OpenLineage methods are optional - if there's no method, return None
try:
+ self.log.debug(
+ "Trying to execute `get_openlineage_facets_on_start` for %s.",
self.operator.task_type
+ )
return
self._get_openlineage_facets(self.operator.get_openlineage_facets_on_start) #
type: ignore
except ImportError:
self.log.error(
@@ -105,9 +108,13 @@ class DefaultExtractor(BaseExtractor):
if task_instance.state == TaskInstanceState.FAILED:
on_failed = getattr(self.operator,
"get_openlineage_facets_on_failure", None)
if on_failed and callable(on_failed):
+ self.log.debug(
+ "Executing `get_openlineage_facets_on_failure` for %s.",
self.operator.task_type
+ )
return self._get_openlineage_facets(on_failed, task_instance)
on_complete = getattr(self.operator,
"get_openlineage_facets_on_complete", None)
if on_complete and callable(on_complete):
+ self.log.debug("Executing `get_openlineage_facets_on_complete` for
%s.", self.operator.task_type)
return self._get_openlineage_facets(on_complete, task_instance)
return self.extract()
diff --git a/airflow/providers/openlineage/extractors/bash.py
b/airflow/providers/openlineage/extractors/bash.py
index d0213fc6fb..39c3c10781 100644
--- a/airflow/providers/openlineage/extractors/bash.py
+++ b/airflow/providers/openlineage/extractors/bash.py
@@ -53,6 +53,10 @@ class BashExtractor(BaseExtractor):
source=self.operator.bash_command,
)
}
+ else:
+ self.log.debug(
+ "OpenLineage disable_source_code option is on - no source code
is extracted.",
+ )
return OperatorLineage(
job_facets=job_facets,
diff --git a/airflow/providers/openlineage/extractors/manager.py
b/airflow/providers/openlineage/extractors/manager.py
index 2da7887556..d9bc49350d 100644
--- a/airflow/providers/openlineage/extractors/manager.py
+++ b/airflow/providers/openlineage/extractors/manager.py
@@ -65,12 +65,18 @@ class ExtractorManager(LoggingMixin):
for operator_class in extractor.get_operator_classnames():
if operator_class in self.extractors:
self.log.debug(
- "Duplicate extractor found for `%s`. `%s` will be used
instead of `%s`",
+ "Duplicate OpenLineage custom extractor found for
`%s`. "
+ "`%s` will be used instead of `%s`",
operator_class,
extractor_path,
self.extractors[operator_class],
)
self.extractors[operator_class] = extractor
+ self.log.debug(
+ "Registered custom OpenLineage extractor `%s` for class
`%s`",
+ extractor_path,
+ operator_class,
+ )
def add_extractor(self, operator_class: str, extractor:
type[BaseExtractor]):
self.extractors[operator_class] = extractor
diff --git a/airflow/providers/openlineage/extractors/python.py
b/airflow/providers/openlineage/extractors/python.py
index 4209926623..8f7efad093 100644
--- a/airflow/providers/openlineage/extractors/python.py
+++ b/airflow/providers/openlineage/extractors/python.py
@@ -57,6 +57,11 @@ class PythonExtractor(BaseExtractor):
source=source_code,
)
}
+ else:
+ self.log.debug(
+ "OpenLineage disable_source_code option is on - no source code
is extracted.",
+ )
+
return OperatorLineage(
job_facets=job_facet,
# The PythonOperator is recorded as an "unknownSource" even though
we have an extractor,
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index 7ee82b5893..d5aa8da759 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -73,8 +73,16 @@ class OpenLineageAdapter(LoggingMixin):
if not self._client:
config = self.get_openlineage_config()
if config:
+ self.log.debug(
+ "OpenLineage configuration found. Transport type: `%s`",
+ config.get("type", "no type provided"),
+ )
self._client = OpenLineageClient.from_dict(config=config)
else:
+ self.log.debug(
+ "OpenLineage configuration not found directly in Airflow. "
+ "Looking for legacy environment configuration. "
+ )
self._client = OpenLineageClient.from_environment()
return self._client
@@ -85,13 +93,19 @@ class OpenLineageAdapter(LoggingMixin):
config = self._read_yaml_config(openlineage_config_path)
if config:
return config.get("transport", None)
+ self.log.debug("OpenLineage config file is empty: `%s`",
openlineage_config_path)
+ else:
+ self.log.debug("OpenLineage config_path configuration not found.")
+
# Second, try to get transport config
transport_config = conf.transport()
if not transport_config:
+ self.log.debug("OpenLineage transport configuration not found.")
return None
return transport_config
- def _read_yaml_config(self, path: str) -> dict | None:
+ @staticmethod
+ def _read_yaml_config(path: str) -> dict | None:
with open(path) as config_file:
return yaml.safe_load(config_file)
@@ -125,6 +139,7 @@ class OpenLineageAdapter(LoggingMixin):
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
stack.enter_context(Stats.timer("ol.emit.attempts"))
self._client.emit(redacted_event)
+ self.log.debug("Successfully emitted OpenLineage event of id
%s", event.run.runId)
except Exception as e:
Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s",
event.run.runId)
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 03c60059d6..9067d53f69 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -89,13 +89,19 @@ class OpenLineageListener:
dag = task.dag
if is_operator_disabled(task):
self.log.debug(
- "Skipping OpenLineage event emission for operator %s "
+ "Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
- return None
+ return
if not is_selective_lineage_enabled(task):
+ self.log.debug(
+ "Skipping OpenLineage event emission for task `%s` "
+ "due to lack of explicit lineage enablement for task or DAG
while "
+ "[openlineage] selective_enable is on.",
+ task.task_id,
+ )
return
@print_warning(self.log)
@@ -157,15 +163,22 @@ class OpenLineageListener:
if TYPE_CHECKING:
assert task
dag = task.dag
+
if is_operator_disabled(task):
self.log.debug(
- "Skipping OpenLineage event emission for operator %s "
+ "Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
- return None
+ return
if not is_selective_lineage_enabled(task):
+ self.log.debug(
+ "Skipping OpenLineage event emission for task `%s` "
+ "due to lack of explicit lineage enablement for task or DAG
while "
+ "[openlineage] selective_enable is on.",
+ task.task_id,
+ )
return
@print_warning(self.log)
@@ -212,15 +225,22 @@ class OpenLineageListener:
if TYPE_CHECKING:
assert task
dag = task.dag
+
if is_operator_disabled(task):
self.log.debug(
- "Skipping OpenLineage event emission for operator %s "
+ "Skipping OpenLineage event emission for operator `%s` "
"due to its presence in [openlineage] disabled_for_operators.",
task.task_type,
)
- return None
+ return
if not is_selective_lineage_enabled(task):
+ self.log.debug(
+ "Skipping OpenLineage event emission for task `%s` "
+ "due to lack of explicit lineage enablement for task or DAG
while "
+ "[openlineage] selective_enable is on.",
+ task.task_id,
+ )
return
@print_warning(self.log)
@@ -277,7 +297,18 @@ class OpenLineageListener:
@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+ self.log.debug(
+ "Skipping OpenLineage event emission for DAG `%s` "
+ "due to lack of explicit lineage enablement for DAG while "
+ "[openlineage] selective_enable is on.",
+ dag_run.dag_id,
+ )
return
+
+ if not self.executor:
+ self.log.debug("Executor have not started before
`on_dag_run_running`")
+ return
+
data_interval_start = dag_run.data_interval_start.isoformat() if
dag_run.data_interval_start else None
data_interval_end = dag_run.data_interval_end.isoformat() if
dag_run.data_interval_end else None
self.executor.submit(
@@ -291,19 +322,35 @@ class OpenLineageListener:
@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+ self.log.debug(
+ "Skipping OpenLineage event emission for DAG `%s` "
+ "due to lack of explicit lineage enablement for DAG while "
+ "[openlineage] selective_enable is on.",
+ dag_run.dag_id,
+ )
return
+
if not self.executor:
self.log.debug("Executor have not started before
`on_dag_run_success`")
return
+
self.executor.submit(self.adapter.dag_success, dag_run=dag_run,
msg=msg)
@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str):
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+ self.log.debug(
+ "Skipping OpenLineage event emission for DAG `%s` "
+ "due to lack of explicit lineage enablement for DAG while "
+ "[openlineage] selective_enable is on.",
+ dag_run.dag_id,
+ )
return
+
if not self.executor:
self.log.debug("Executor have not started before
`on_dag_run_failed`")
return
+
self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
diff --git a/airflow/providers/openlineage/provider.yaml
b/airflow/providers/openlineage/provider.yaml
index afd3cf95d0..4df5f0d008 100644
--- a/airflow/providers/openlineage/provider.yaml
+++ b/airflow/providers/openlineage/provider.yaml
@@ -131,7 +131,7 @@ config:
Disable the inclusion of source code in OpenLineage events by
setting this to `true`.
By default, several Operators (e.g. Python, Bash) will include their
source code in the events
unless disabled.
- default: ~
+ default: "False"
example: ~
type: boolean
version_added: ~
diff --git a/tests/providers/openlineage/test_conf.py
b/tests/providers/openlineage/test_conf.py
index 95a7cde3e5..6271481689 100644
--- a/tests/providers/openlineage/test_conf.py
+++ b/tests/providers/openlineage/test_conf.py
@@ -22,12 +22,14 @@ from unittest import mock
import pytest
from airflow.providers.openlineage.conf import (
+ _is_true,
config_path,
custom_extractors,
disabled_operators,
is_disabled,
is_source_enabled,
namespace,
+ selective_enable,
transport,
)
from tests.test_utils.config import conf_vars, env_vars
@@ -46,6 +48,22 @@ _CONFIG_OPTION_TRANSPORT = "transport"
_VAR_DISABLED = "OPENLINEAGE_DISABLED"
_CONFIG_OPTION_DISABLED = "disabled"
_VAR_URL = "OPENLINEAGE_URL"
+_CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable"
+
+_BOOL_PARAMS = (
+ ("1", True),
+ ("t", True),
+ ("T", True),
+ ("tRuE ", True),
+ (" true", True),
+ ("TRUE", True),
+ ("0", False),
+ ("f", False),
+ ("F", False),
+ (" fAlSe", False),
+ ("false ", False),
+ ("FALSE", False),
+)
@pytest.fixture(autouse=True)
@@ -57,6 +75,7 @@ def clear_cache():
namespace.cache_clear()
transport.cache_clear()
is_disabled.cache_clear()
+ selective_enable.cache_clear()
try:
yield
finally:
@@ -67,6 +86,21 @@ def clear_cache():
namespace.cache_clear()
transport.cache_clear()
is_disabled.cache_clear()
+ selective_enable.cache_clear()
+
+
[email protected](
+ ("var_string", "expected"),
+ (
+ *_BOOL_PARAMS,
+ ("some_string", False),
+ ("aasd123", False),
+ (True, True),
+ (False, False),
+ ),
+)
+def test_is_true(var_string, expected):
+ assert _is_true(var_string) is expected
@env_vars({_VAR_CONFIG_PATH: "env_var_path"})
@@ -91,6 +125,16 @@ def test_config_path_do_not_fail_if_conf_option_missing():
assert config_path() == ""
[email protected](
+ ("var_string", "expected"),
+ _BOOL_PARAMS,
+)
+def test_disable_source_code(var_string, expected):
+ with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DISABLE_SOURCE_CODE):
var_string}):
+ result = is_source_enabled()
+ assert result is not expected # conf is disabled_... and func is
enabled_... hence the `not` here
+
+
@env_vars({_VAR_DISABLE_SOURCE_CODE: "true"})
@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DISABLE_SOURCE_CODE): None})
def test_disable_source_code_legacy_env_var_is_used_when_no_conf_option_set():
@@ -124,6 +168,31 @@ def
test_disable_source_code_do_not_fail_if_conf_option_missing():
assert is_source_enabled() is True
[email protected](
+ ("var_string", "expected"),
+ _BOOL_PARAMS,
+)
+def test_selective_enable(var_string, expected):
+ with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_SELECTIVE_ENABLE):
var_string}):
+ result = selective_enable()
+ assert result is expected
+
+
+@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_SELECTIVE_ENABLE):
"asdadawlaksnd"})
+def test_selective_enable_not_working_for_random_string():
+ assert selective_enable() is False
+
+
+@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_SELECTIVE_ENABLE): ""})
+def test_selective_enable_empty_conf_option():
+ assert selective_enable() is False
+
+
+@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_SELECTIVE_ENABLE): None})
+def test_selective_enable_do_not_fail_if_conf_option_missing():
+ assert selective_enable() is False
+
+
@pytest.mark.parametrize(
("var_string", "expected"),
(