This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bc6c32366 chore: Add more OpenLineage logs to facilitate debugging 
(#39136)
8bc6c32366 is described below

commit 8bc6c32366e723c897c0c4be3b3026c61314b519
Author: Kacper Muda <[email protected]>
AuthorDate: Mon May 13 18:54:48 2024 +0200

    chore: Add more OpenLineage logs to facilitate debugging (#39136)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 airflow/providers/openlineage/conf.py              | 17 +++---
 airflow/providers/openlineage/extractors/base.py   |  7 +++
 airflow/providers/openlineage/extractors/bash.py   |  4 ++
 .../providers/openlineage/extractors/manager.py    |  8 ++-
 airflow/providers/openlineage/extractors/python.py |  5 ++
 airflow/providers/openlineage/plugins/adapter.py   | 17 +++++-
 airflow/providers/openlineage/plugins/listener.py  | 59 ++++++++++++++++--
 airflow/providers/openlineage/provider.yaml        |  2 +-
 tests/providers/openlineage/test_conf.py           | 69 ++++++++++++++++++++++
 9 files changed, 172 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/openlineage/conf.py 
b/airflow/providers/openlineage/conf.py
index 4ca42eedfd..d43806abca 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -26,6 +26,10 @@ from airflow.configuration import conf
 _CONFIG_SECTION = "openlineage"
 
 
+def _is_true(arg: Any) -> bool:
+    return str(arg).lower().strip() in ("true", "1", "t")
+
+
 @cache
 def config_path(check_legacy_env_var: bool = True) -> str:
     """[openlineage] config_path."""
@@ -41,7 +45,8 @@ def is_source_enabled() -> bool:
     option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="")
     if not option:
         option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "")
-    return option.lower() not in ("true", "1", "t")
+    # when disable_source_code is True, is_source_enabled() should be False
+    return not _is_true(option)
 
 
 @cache
@@ -53,7 +58,9 @@ def disabled_operators() -> set[str]:
 
 @cache
 def selective_enable() -> bool:
-    return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False)
+    """[openlineage] selective_enable."""
+    option = conf.get(_CONFIG_SECTION, "selective_enable", fallback="")
+    return _is_true(option)
 
 
 @cache
@@ -85,11 +92,7 @@ def transport() -> dict[str, Any]:
 
 @cache
 def is_disabled() -> bool:
-    """[openlineage] disabled + some extra checks."""
-
-    def _is_true(val):
-        return str(val).lower().strip() in ("true", "1", "t")
-
+    """[openlineage] disabled + check if any configuration is present."""
     option = conf.get(_CONFIG_SECTION, "disabled", fallback="")
     if _is_true(option):
         return True
diff --git a/airflow/providers/openlineage/extractors/base.py 
b/airflow/providers/openlineage/extractors/base.py
index 2f5f957b5b..6fa805b6fb 100644
--- a/airflow/providers/openlineage/extractors/base.py
+++ b/airflow/providers/openlineage/extractors/base.py
@@ -87,6 +87,9 @@ class DefaultExtractor(BaseExtractor):
     def _execute_extraction(self) -> OperatorLineage | None:
         # OpenLineage methods are optional - if there's no method, return None
         try:
+            self.log.debug(
+                "Trying to execute `get_openlineage_facets_on_start` for %s.", 
self.operator.task_type
+            )
             return 
self._get_openlineage_facets(self.operator.get_openlineage_facets_on_start)  # 
type: ignore
         except ImportError:
             self.log.error(
@@ -105,9 +108,13 @@ class DefaultExtractor(BaseExtractor):
         if task_instance.state == TaskInstanceState.FAILED:
             on_failed = getattr(self.operator, 
"get_openlineage_facets_on_failure", None)
             if on_failed and callable(on_failed):
+                self.log.debug(
+                    "Executing `get_openlineage_facets_on_failure` for %s.", 
self.operator.task_type
+                )
                 return self._get_openlineage_facets(on_failed, task_instance)
         on_complete = getattr(self.operator, 
"get_openlineage_facets_on_complete", None)
         if on_complete and callable(on_complete):
+            self.log.debug("Executing `get_openlineage_facets_on_complete` for 
%s.", self.operator.task_type)
             return self._get_openlineage_facets(on_complete, task_instance)
         return self.extract()
 
diff --git a/airflow/providers/openlineage/extractors/bash.py 
b/airflow/providers/openlineage/extractors/bash.py
index d0213fc6fb..39c3c10781 100644
--- a/airflow/providers/openlineage/extractors/bash.py
+++ b/airflow/providers/openlineage/extractors/bash.py
@@ -53,6 +53,10 @@ class BashExtractor(BaseExtractor):
                     source=self.operator.bash_command,
                 )
             }
+        else:
+            self.log.debug(
+                "OpenLineage disable_source_code option is on - no source code 
is extracted.",
+            )
 
         return OperatorLineage(
             job_facets=job_facets,
diff --git a/airflow/providers/openlineage/extractors/manager.py 
b/airflow/providers/openlineage/extractors/manager.py
index 2da7887556..d9bc49350d 100644
--- a/airflow/providers/openlineage/extractors/manager.py
+++ b/airflow/providers/openlineage/extractors/manager.py
@@ -65,12 +65,18 @@ class ExtractorManager(LoggingMixin):
             for operator_class in extractor.get_operator_classnames():
                 if operator_class in self.extractors:
                     self.log.debug(
-                        "Duplicate extractor found for `%s`. `%s` will be used 
instead of `%s`",
+                        "Duplicate OpenLineage custom extractor found for 
`%s`. "
+                        "`%s` will be used instead of `%s`",
                         operator_class,
                         extractor_path,
                         self.extractors[operator_class],
                     )
                 self.extractors[operator_class] = extractor
+                self.log.debug(
+                    "Registered custom OpenLineage extractor `%s` for class 
`%s`",
+                    extractor_path,
+                    operator_class,
+                )
 
     def add_extractor(self, operator_class: str, extractor: 
type[BaseExtractor]):
         self.extractors[operator_class] = extractor
diff --git a/airflow/providers/openlineage/extractors/python.py 
b/airflow/providers/openlineage/extractors/python.py
index 4209926623..8f7efad093 100644
--- a/airflow/providers/openlineage/extractors/python.py
+++ b/airflow/providers/openlineage/extractors/python.py
@@ -57,6 +57,11 @@ class PythonExtractor(BaseExtractor):
                     source=source_code,
                 )
             }
+        else:
+            self.log.debug(
+                "OpenLineage disable_source_code option is on - no source code 
is extracted.",
+            )
+
         return OperatorLineage(
             job_facets=job_facet,
             # The PythonOperator is recorded as an "unknownSource" even though 
we have an extractor,
diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index 7ee82b5893..d5aa8da759 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -73,8 +73,16 @@ class OpenLineageAdapter(LoggingMixin):
         if not self._client:
             config = self.get_openlineage_config()
             if config:
+                self.log.debug(
+                    "OpenLineage configuration found. Transport type: `%s`",
+                    config.get("type", "no type provided"),
+                )
                 self._client = OpenLineageClient.from_dict(config=config)
             else:
+                self.log.debug(
+                    "OpenLineage configuration not found directly in Airflow. "
+                    "Looking for legacy environment configuration. "
+                )
                 self._client = OpenLineageClient.from_environment()
         return self._client
 
@@ -85,13 +93,19 @@ class OpenLineageAdapter(LoggingMixin):
             config = self._read_yaml_config(openlineage_config_path)
             if config:
                 return config.get("transport", None)
+            self.log.debug("OpenLineage config file is empty: `%s`", 
openlineage_config_path)
+        else:
+            self.log.debug("OpenLineage config_path configuration not found.")
+
         # Second, try to get transport config
         transport_config = conf.transport()
         if not transport_config:
+            self.log.debug("OpenLineage transport configuration not found.")
             return None
         return transport_config
 
-    def _read_yaml_config(self, path: str) -> dict | None:
+    @staticmethod
+    def _read_yaml_config(path: str) -> dict | None:
         with open(path) as config_file:
             return yaml.safe_load(config_file)
 
@@ -125,6 +139,7 @@ class OpenLineageAdapter(LoggingMixin):
                 
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
                 stack.enter_context(Stats.timer("ol.emit.attempts"))
                 self._client.emit(redacted_event)
+                self.log.debug("Successfully emitted OpenLineage event of id 
%s", event.run.runId)
         except Exception as e:
             Stats.incr("ol.emit.failed")
             self.log.warning("Failed to emit OpenLineage event of id %s", 
event.run.runId)
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 03c60059d6..9067d53f69 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -89,13 +89,19 @@ class OpenLineageListener:
         dag = task.dag
         if is_operator_disabled(task):
             self.log.debug(
-                "Skipping OpenLineage event emission for operator %s "
+                "Skipping OpenLineage event emission for operator `%s` "
                 "due to its presence in [openlineage] disabled_for_operators.",
                 task.task_type,
             )
-            return None
+            return
 
         if not is_selective_lineage_enabled(task):
+            self.log.debug(
+                "Skipping OpenLineage event emission for task `%s` "
+                "due to lack of explicit lineage enablement for task or DAG 
while "
+                "[openlineage] selective_enable is on.",
+                task.task_id,
+            )
             return
 
         @print_warning(self.log)
@@ -157,15 +163,22 @@ class OpenLineageListener:
         if TYPE_CHECKING:
             assert task
         dag = task.dag
+
         if is_operator_disabled(task):
             self.log.debug(
-                "Skipping OpenLineage event emission for operator %s "
+                "Skipping OpenLineage event emission for operator `%s` "
                 "due to its presence in [openlineage] disabled_for_operators.",
                 task.task_type,
             )
-            return None
+            return
 
         if not is_selective_lineage_enabled(task):
+            self.log.debug(
+                "Skipping OpenLineage event emission for task `%s` "
+                "due to lack of explicit lineage enablement for task or DAG 
while "
+                "[openlineage] selective_enable is on.",
+                task.task_id,
+            )
             return
 
         @print_warning(self.log)
@@ -212,15 +225,22 @@ class OpenLineageListener:
         if TYPE_CHECKING:
             assert task
         dag = task.dag
+
         if is_operator_disabled(task):
             self.log.debug(
-                "Skipping OpenLineage event emission for operator %s "
+                "Skipping OpenLineage event emission for operator `%s` "
                 "due to its presence in [openlineage] disabled_for_operators.",
                 task.task_type,
             )
-            return None
+            return
 
         if not is_selective_lineage_enabled(task):
+            self.log.debug(
+                "Skipping OpenLineage event emission for task `%s` "
+                "due to lack of explicit lineage enablement for task or DAG 
while "
+                "[openlineage] selective_enable is on.",
+                task.task_id,
+            )
             return
 
         @print_warning(self.log)
@@ -277,7 +297,18 @@ class OpenLineageListener:
     @hookimpl
     def on_dag_run_running(self, dag_run: DagRun, msg: str):
         if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+            self.log.debug(
+                "Skipping OpenLineage event emission for DAG `%s` "
+                "due to lack of explicit lineage enablement for DAG while "
+                "[openlineage] selective_enable is on.",
+                dag_run.dag_id,
+            )
             return
+
+        if not self.executor:
+            self.log.debug("Executor have not started before 
`on_dag_run_running`")
+            return
+
         data_interval_start = dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
         data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
         self.executor.submit(
@@ -291,19 +322,35 @@ class OpenLineageListener:
     @hookimpl
     def on_dag_run_success(self, dag_run: DagRun, msg: str):
         if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+            self.log.debug(
+                "Skipping OpenLineage event emission for DAG `%s` "
+                "due to lack of explicit lineage enablement for DAG while "
+                "[openlineage] selective_enable is on.",
+                dag_run.dag_id,
+            )
             return
+
         if not self.executor:
             self.log.debug("Executor have not started before 
`on_dag_run_success`")
             return
+
         self.executor.submit(self.adapter.dag_success, dag_run=dag_run, 
msg=msg)
 
     @hookimpl
     def on_dag_run_failed(self, dag_run: DagRun, msg: str):
         if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+            self.log.debug(
+                "Skipping OpenLineage event emission for DAG `%s` "
+                "due to lack of explicit lineage enablement for DAG while "
+                "[openlineage] selective_enable is on.",
+                dag_run.dag_id,
+            )
             return
+
         if not self.executor:
             self.log.debug("Executor have not started before 
`on_dag_run_failed`")
             return
+
         self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
 
 
diff --git a/airflow/providers/openlineage/provider.yaml 
b/airflow/providers/openlineage/provider.yaml
index afd3cf95d0..4df5f0d008 100644
--- a/airflow/providers/openlineage/provider.yaml
+++ b/airflow/providers/openlineage/provider.yaml
@@ -131,7 +131,7 @@ config:
           Disable the inclusion of source code in OpenLineage events by 
setting this to `true`.
           By default, several Operators (e.g. Python, Bash) will include their 
source code in the events
           unless disabled.
-        default: ~
+        default: "False"
         example: ~
         type: boolean
         version_added: ~
diff --git a/tests/providers/openlineage/test_conf.py 
b/tests/providers/openlineage/test_conf.py
index 95a7cde3e5..6271481689 100644
--- a/tests/providers/openlineage/test_conf.py
+++ b/tests/providers/openlineage/test_conf.py
@@ -22,12 +22,14 @@ from unittest import mock
 import pytest
 
 from airflow.providers.openlineage.conf import (
+    _is_true,
     config_path,
     custom_extractors,
     disabled_operators,
     is_disabled,
     is_source_enabled,
     namespace,
+    selective_enable,
     transport,
 )
 from tests.test_utils.config import conf_vars, env_vars
@@ -46,6 +48,22 @@ _CONFIG_OPTION_TRANSPORT = "transport"
 _VAR_DISABLED = "OPENLINEAGE_DISABLED"
 _CONFIG_OPTION_DISABLED = "disabled"
 _VAR_URL = "OPENLINEAGE_URL"
+_CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable"
+
+_BOOL_PARAMS = (
+    ("1", True),
+    ("t", True),
+    ("T", True),
+    ("tRuE ", True),
+    (" true", True),
+    ("TRUE", True),
+    ("0", False),
+    ("f", False),
+    ("F", False),
+    (" fAlSe", False),
+    ("false ", False),
+    ("FALSE", False),
+)
 
 
 @pytest.fixture(autouse=True)
@@ -57,6 +75,7 @@ def clear_cache():
     namespace.cache_clear()
     transport.cache_clear()
     is_disabled.cache_clear()
+    selective_enable.cache_clear()
     try:
         yield
     finally:
@@ -67,6 +86,21 @@ def clear_cache():
         namespace.cache_clear()
         transport.cache_clear()
         is_disabled.cache_clear()
+        selective_enable.cache_clear()
+
+
[email protected](
+    ("var_string", "expected"),
+    (
+        *_BOOL_PARAMS,
+        ("some_string", False),
+        ("aasd123", False),
+        (True, True),
+        (False, False),
+    ),
+)
+def test_is_true(var_string, expected):
+    assert _is_true(var_string) is expected
 
 
 @env_vars({_VAR_CONFIG_PATH: "env_var_path"})
@@ -91,6 +125,16 @@ def test_config_path_do_not_fail_if_conf_option_missing():
     assert config_path() == ""
 
 
[email protected](
+    ("var_string", "expected"),
+    _BOOL_PARAMS,
+)
+def test_disable_source_code(var_string, expected):
+    with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DISABLE_SOURCE_CODE): 
var_string}):
+        result = is_source_enabled()
+        assert result is not expected  # conf is disabled_... and func is 
enabled_... hence the `not` here
+
+
 @env_vars({_VAR_DISABLE_SOURCE_CODE: "true"})
 @conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DISABLE_SOURCE_CODE): None})
 def test_disable_source_code_legacy_env_var_is_used_when_no_conf_option_set():
@@ -124,6 +168,31 @@ def 
test_disable_source_code_do_not_fail_if_conf_option_missing():
     assert is_source_enabled() is True
 
 
[email protected](
+    ("var_string", "expected"),
+    _BOOL_PARAMS,
+)
+def test_selective_enable(var_string, expected):
+    with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_SELECTIVE_ENABLE): 
var_string}):
+        result = selective_enable()
+        assert result is expected
+
+
+@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_SELECTIVE_ENABLE): 
"asdadawlaksnd"})
+def test_selective_enable_not_working_for_random_string():
+    assert selective_enable() is False
+
+
+@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_SELECTIVE_ENABLE): ""})
+def test_selective_enable_empty_conf_option():
+    assert selective_enable() is False
+
+
+@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_SELECTIVE_ENABLE): None})
+def test_selective_enable_do_not_fail_if_conf_option_missing():
+    assert selective_enable() is False
+
+
 @pytest.mark.parametrize(
     ("var_string", "expected"),
     (

Reply via email to