This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d12eb43960 feat: add debug facet to all OpenLineage events (#41217)
d12eb43960 is described below

commit d12eb439603f896f22e4cd6f4e5daef22ae86254
Author: Kacper Muda <[email protected]>
AuthorDate: Mon Aug 12 11:30:49 2024 +0200

    feat: add debug facet to all OpenLineage events (#41217)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 airflow/providers/openlineage/conf.py              |  6 ++++
 .../openlineage/facets/AirflowDebugRunFacet.json   | 30 ++++++++++++++++
 airflow/providers/openlineage/plugins/adapter.py   |  6 ++--
 airflow/providers/openlineage/plugins/facets.py    |  7 ++++
 airflow/providers/openlineage/plugins/listener.py  |  7 ++++
 airflow/providers/openlineage/provider.yaml        |  8 +++++
 airflow/providers/openlineage/utils/utils.py       | 24 +++++++++++++
 .../guides/developer.rst                           | 21 +++++++++---
 .../guides/user.rst                                | 40 ++++++++++++++++++++--
 .../providers/openlineage/plugins/test_adapter.py  | 19 +++++++---
 .../providers/openlineage/plugins/test_listener.py | 22 ++++++++++--
 tests/providers/openlineage/plugins/test_utils.py  | 24 +++++++++++++
 tests/providers/openlineage/test_conf.py           | 34 ++++++++++++++++++
 13 files changed, 232 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/openlineage/conf.py 
b/airflow/providers/openlineage/conf.py
index 562b673ed5..b0c763b280 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -145,3 +145,9 @@ def execution_timeout() -> int:
 def include_full_task_info() -> bool:
     """[openlineage] include_full_task_info."""
     return conf.getboolean(_CONFIG_SECTION, "include_full_task_info", 
fallback="False")
+
+
+@cache
+def debug_mode() -> bool:
+    """[openlineage] debug_mode."""
+    return conf.getboolean(_CONFIG_SECTION, "debug_mode", fallback="False")
diff --git a/airflow/providers/openlineage/facets/AirflowDebugRunFacet.json 
b/airflow/providers/openlineage/facets/AirflowDebugRunFacet.json
new file mode 100644
index 0000000000..6c3911d63e
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowDebugRunFacet.json
@@ -0,0 +1,30 @@
+{
+    "$schema": "https://json-schema.org/draft/2020-12/schema";,
+    "$defs": {
+      "AirflowDebugRunFacet": {
+        "allOf": [
+          {
+            "$ref": 
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet";
+          },
+          {
+            "type": "object",
+            "properties": {
+              "packages": {
+                "description": "The names and versions of all installed Python 
packages.",
+                "type": "object",
+                "additionalProperties": true
+              }
+            },
+            "required": ["packages"]
+          }
+        ],
+        "type": "object"
+      }
+    },
+    "type": "object",
+    "properties": {
+      "debug": {
+        "$ref": "#/$defs/AirflowDebugRunFacet"
+      }
+    }
+  }
diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index 60aefc6102..70b237d53b 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -41,6 +41,7 @@ from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VE
 from airflow.providers.openlineage.utils.utils import (
     OpenLineageRedactor,
     get_airflow_dag_run_facet,
+    get_airflow_debug_facet,
     get_airflow_state_run_facet,
 )
 from airflow.stats import Stats
@@ -361,7 +362,7 @@ class OpenLineageAdapter(LoggingMixin):
                     job_name=dag_run.dag_id,
                     nominal_start_time=nominal_start_time,
                     nominal_end_time=nominal_end_time,
-                    run_facets=get_airflow_dag_run_facet(dag_run),
+                    run_facets={**get_airflow_dag_run_facet(dag_run), 
**get_airflow_debug_facet()},
                 ),
                 inputs=[],
                 outputs=[],
@@ -385,7 +386,7 @@ class OpenLineageAdapter(LoggingMixin):
                         dag_id=dag_run.dag_id,
                         execution_date=dag_run.execution_date,
                     ),
-                    facets={**get_airflow_state_run_facet(dag_run)},
+                    facets={**get_airflow_state_run_facet(dag_run), 
**get_airflow_debug_facet()},
                 ),
                 inputs=[],
                 outputs=[],
@@ -414,6 +415,7 @@ class OpenLineageAdapter(LoggingMixin):
                             message=msg, programmingLanguage="python"
                         ),
                         **get_airflow_state_run_facet(dag_run),
+                        **get_airflow_debug_facet(),
                     },
                 ),
                 inputs=[],
diff --git a/airflow/providers/openlineage/plugins/facets.py 
b/airflow/providers/openlineage/plugins/facets.py
index d952cd0449..e35d312b70 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -101,6 +101,13 @@ class AirflowDagRunFacet(RunFacet):
     dagRun: dict
 
 
+@define
+class AirflowDebugRunFacet(RunFacet):
+    """Airflow Debug run facet."""
+
+    packages: dict
+
+
 @define
 class UnknownOperatorInstance(RedactMixin):
     """
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 9da9267db3..6c59472ca7 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -32,6 +32,7 @@ from airflow.providers.openlineage.extractors import 
ExtractorManager
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, 
RunState
 from airflow.providers.openlineage.utils.utils import (
     IS_AIRFLOW_2_10_OR_HIGHER,
+    get_airflow_debug_facet,
     get_airflow_job_facet,
     get_airflow_mapped_task_facet,
     get_airflow_run_facet,
@@ -122,6 +123,9 @@ class OpenLineageListener:
             )
             return
 
+        # Needs to be calculated outside of inner method so that it gets 
cached for usage in fork processes
+        debug_facet = get_airflow_debug_facet()
+
         @print_warning(self.log)
         def on_running():
             # that's a workaround to detect task running from deferred state
@@ -166,6 +170,7 @@ class OpenLineageListener:
                     **get_user_provided_run_facets(task_instance, 
TaskInstanceState.RUNNING),
                     **get_airflow_mapped_task_facet(task_instance),
                     **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
+                    **debug_facet,
                 },
             )
             Stats.gauge(
@@ -237,6 +242,7 @@ class OpenLineageListener:
                 run_facets={
                     **get_user_provided_run_facets(task_instance, 
TaskInstanceState.SUCCESS),
                     **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
+                    **get_airflow_debug_facet(),
                 },
             )
             Stats.gauge(
@@ -336,6 +342,7 @@ class OpenLineageListener:
                 run_facets={
                     **get_user_provided_run_facets(task_instance, 
TaskInstanceState.FAILED),
                     **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
+                    **get_airflow_debug_facet(),
                 },
             )
             Stats.gauge(
diff --git a/airflow/providers/openlineage/provider.yaml 
b/airflow/providers/openlineage/provider.yaml
index af5c15fce0..61004c25b5 100644
--- a/airflow/providers/openlineage/provider.yaml
+++ b/airflow/providers/openlineage/provider.yaml
@@ -168,3 +168,11 @@ config:
         example: ~
         type: boolean
         version_added: 1.10.0
+      debug_mode:
+        description: |
+          If true, OpenLineage events will include information useful for 
debugging - potentially
+          containing large fields e.g. all installed packages and their 
versions.
+        default: "False"
+        example: ~
+        type: boolean
+        version_added: 1.11.0
diff --git a/airflow/providers/openlineage/utils/utils.py 
b/airflow/providers/openlineage/utils/utils.py
index 0e1a4fac73..dd070d6fa5 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -23,6 +23,7 @@ import logging
 import re
 from contextlib import redirect_stdout, suppress
 from functools import wraps
+from importlib import metadata
 from io import StringIO
 from typing import TYPE_CHECKING, Any, Callable, Iterable
 
@@ -38,6 +39,7 @@ from airflow.models import DAG, BaseOperator, MappedOperator
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.plugins.facets import (
     AirflowDagRunFacet,
+    AirflowDebugRunFacet,
     AirflowJobFacet,
     AirflowMappedTaskRunFacet,
     AirflowRunFacet,
@@ -378,6 +380,28 @@ def get_airflow_dag_run_facet(dag_run: DagRun) -> 
dict[str, RunFacet]:
     }
 
 
[email protected]
+def _get_all_packages_installed() -> dict[str, str]:
+    """
+    Retrieve a dictionary of all installed packages and their versions.
+
+    This operation involves scanning the system's installed packages, which 
can be a heavy operation.
+    It is recommended to cache the result to avoid repeated, expensive lookups.
+    """
+    return {dist.metadata["Name"]: dist.version for dist in 
metadata.distributions()}
+
+
+def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:
+    if not conf.debug_mode():
+        return {}
+    log.warning("OpenLineage debug_mode is enabled. Be aware that this may log 
and emit extensive details.")
+    return {
+        "debug": AirflowDebugRunFacet(
+            packages=_get_all_packages_installed(),
+        )
+    }
+
+
 def get_airflow_run_facet(
     dag_run: DagRun,
     dag: DAG,
diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst 
b/docs/apache-airflow-providers-openlineage/guides/developer.rst
index 806f2c2145..9686b60347 100644
--- a/docs/apache-airflow-providers-openlineage/guides/developer.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/developer.rst
@@ -137,7 +137,7 @@ Authors of tests need to remember the condition of calling 
different OL methods
 ``get_openlineage_facets_on_start`` is called before ``execute``, and as such, 
must not depend on values
 that are set there.
 
-See :ref:`local_troubleshooting:openlineage` for details on how to 
troubleshoot OpenLineage locally.
+See :ref:`troubleshooting:openlineage` for details on how to troubleshoot 
OpenLineage locally.
 
 There is no existing framework for system testing OpenLineage integration, but 
the easiest way it can be achieved is
 by comparing emitted events (f.e. with ``FileTransport``) against expected 
ones.
@@ -299,7 +299,7 @@ creating a gap in pipeline observability.
 Even with unit tests, an Extractor may still not be operating as expected.
 The easiest way to tell if data isn't coming through correctly is if the UI 
elements are not showing up correctly in the Lineage tab.
 
-See :ref:`local_troubleshooting:openlineage` for details on how to 
troubleshoot OpenLineage locally.
+See :ref:`troubleshooting:openlineage` for details on how to troubleshoot 
OpenLineage locally.
 
 Example
 ^^^^^^^
@@ -573,9 +573,9 @@ OpenLineage reflects this structure in its Job Hierarchy 
model.
 
 TaskInstance events' ParentRunFacet references the originating DAG run.
 
-.. _local_troubleshooting:openlineage:
+.. _troubleshooting:openlineage:
 
-Local troubleshooting
+Troubleshooting
 =====================
 
 When testing code locally, `Marquez 
<https://marquezproject.ai/docs/quickstart>`_ can be used to inspect the data 
being emitted—or not being emitted.
@@ -585,6 +585,19 @@ then the Extractor is fine and an issue should be opened 
up in OpenLineage. Howe
 it is likely that more unit tests are needed to cover Extractor behavior.
 Marquez can help you pinpoint which facets are not being formed properly so 
you know where to add test coverage.
 
+Debug settings
+^^^^^^^^^^^^^^
+For debugging purposes, ensure that the `Airflow logging level 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#logging-level>`_
+is set to ``DEBUG`` and that the :ref:`debug_mode <options:debug_mode>` is 
enabled for OpenLineage integration.
+This will increase the detail in Airflow logs and include additional 
environmental information in OpenLineage events.
+
+When seeking help with debugging, always try to provide the following:
+
+-    Airflow scheduler logs with the logging level set to DEBUG
+-    Airflow worker logs (task logs) with the logging level set to DEBUG
+-    OpenLineage events with debug_mode enabled
+
+
 Where can I learn more?
 =======================
 
diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst 
b/docs/apache-airflow-providers-openlineage/guides/user.rst
index a5ced101f0..2c299b8c6d 100644
--- a/docs/apache-airflow-providers-openlineage/guides/user.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/user.rst
@@ -189,8 +189,6 @@ If not set, it's using ``default`` namespace. Provide the 
name of the namespace
 
   AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
 
-.. _options:disable:
-
 Timeout
 ^^^^^^^
 
@@ -210,6 +208,7 @@ The code runs with default timeout of 10 seconds. You can 
increase this by setti
 
   AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60
 
+.. _options:disable:
 
 Disable
 ^^^^^^^
@@ -283,6 +282,10 @@ serializing only a few known attributes, we exclude 
certain non-serializable ele
 
 ``AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO`` environment variable is an 
equivalent.
 
+.. code-block:: ini
+
+  AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true
+
 .. warning::
 
   By setting this variable to true, OpenLineage integration does not control 
the size of event you sent. It can potentially include elements that are 
megabytes in size or larger, depending on the size of data you pass to the task.
@@ -324,6 +327,31 @@ a string of semicolon separated full import paths to 
``custom_run_facets`` optio
 
   
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
 
+.. _options:debug_mode:
+
+Debug Mode
+^^^^^^^^^^
+
+You can enable sending additional information in OpenLineage events that can 
be useful for debugging and
+reproducing your environment setup by setting ``debug_mode`` option to 
``true`` in Airflow configuration.
+
+.. code-block:: ini
+
+    [openlineage]
+    transport = {"type": "http", "url": "http://example.com:5000";, "endpoint": 
"api/v1/lineage"}
+    debug_mode = true
+
+``AIRFLOW__OPENLINEAGE__DEBUG_MODE`` environment variable is an equivalent.
+
+.. code-block:: ini
+
+  AIRFLOW__OPENLINEAGE__DEBUG_MODE=true
+
+.. warning::
+
+  By setting this variable to true, OpenLineage integration may log and emit 
extensive details. It should only be enabled temporary for debugging purposes.
+
+
 Enabling OpenLineage on DAG/task level
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
@@ -335,6 +363,12 @@ To enable this policy, set the ``selective_enable`` option 
to True in the [openl
     [openlineage]
     selective_enable = True
 
+``AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE`` environment variable is an 
equivalent.
+
+.. code-block:: ini
+
+  AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
+
 
 While ``selective_enable`` enables selective control, the ``disabled`` 
:ref:`option <options:disable>` still has precedence.
 If you set ``disabled`` to True in the configuration, OpenLineage will be 
disabled for all DAGs and tasks regardless of the ``selective_enable`` setting.
@@ -383,7 +417,7 @@ Disabling DAG-level lineage while enabling task-level 
lineage might cause errors
 Troubleshooting
 ===============
 
-See :ref:`local_troubleshooting:openlineage` for details on how to 
troubleshoot OpenLineage locally.
+See :ref:`troubleshooting:openlineage` for details on how to troubleshoot 
OpenLineage.
 
 
 Adding support for custom Operators
diff --git a/tests/providers/openlineage/plugins/test_adapter.py 
b/tests/providers/openlineage/plugins/test_adapter.py
index 60923ffbe2..18d7c0c8d9 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -47,6 +47,7 @@ from airflow.providers.openlineage.extractors import 
OperatorLineage
 from airflow.providers.openlineage.plugins.adapter import _PRODUCER, 
OpenLineageAdapter
 from airflow.providers.openlineage.plugins.facets import (
     AirflowDagRunFacet,
+    AirflowDebugRunFacet,
     AirflowStateRunFacet,
 )
 from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
@@ -527,10 +528,11 @@ def 
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid):
+def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid, mock_debug_mode):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -600,6 +602,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
                                 "start_date": event_time.isoformat(),
                             },
                         ),
+                        "debug": AirflowDebugRunFacet(packages=ANY),
                     },
                 ),
                 job=Job(
@@ -630,11 +633,14 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 @mock.patch.object(DagRun, "get_task_instances")
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid, mocked_get_tasks):
+def test_emit_dag_complete_event(
+    mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks, 
mock_debug_mode
+):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -684,7 +690,8 @@ def test_emit_dag_complete_event(mock_stats_incr, 
mock_stats_timer, generate_sta
                                 task_1.task_id: TaskInstanceState.SKIPPED,
                                 task_2.task_id: TaskInstanceState.FAILED,
                             },
-                        )
+                        ),
+                        "debug": AirflowDebugRunFacet(packages=ANY),
                     },
                 ),
                 job=Job(
@@ -708,11 +715,14 @@ def test_emit_dag_complete_event(mock_stats_incr, 
mock_stats_timer, generate_sta
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 @mock.patch.object(DagRun, "get_task_instances")
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid, mocked_get_tasks):
+def test_emit_dag_failed_event(
+    mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks, 
mock_debug_mode
+):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -764,6 +774,7 @@ def test_emit_dag_failed_event(mock_stats_incr, 
mock_stats_timer, generate_stati
                                 task_2.task_id: TaskInstanceState.FAILED,
                             },
                         ),
+                        "debug": AirflowDebugRunFacet(packages=ANY),
                     },
                 ),
                 job=Job(
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index 3b0c9f0159..b7004ac392 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -21,7 +21,7 @@ import uuid
 from contextlib import suppress
 from typing import Callable
 from unittest import mock
-from unittest.mock import patch
+from unittest.mock import ANY, patch
 
 import pandas as pd
 import pytest
@@ -29,6 +29,7 @@ import pytest
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.models.baseoperator import BaseOperator
 from airflow.operators.python import PythonOperator
+from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
 from airflow.providers.openlineage.plugins.listener import OpenLineageListener
 from airflow.providers.openlineage.utils.selective_enable import 
disable_lineage, enable_lineage
 from airflow.utils.state import State
@@ -213,6 +214,7 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
     return listener, task_instance
 
 
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet")
@@ -225,6 +227,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(
     mock_get_user_provided_run_facets,
     mock_get_airflow_run_facet,
     mock_disabled,
+    mock_debug_mode,
 ):
     """Tests that the 'start_task' method of the OpenLineageAdapter is invoked 
with the correct arguments.
 
@@ -258,10 +261,12 @@ def 
test_adapter_start_task_is_called_with_proper_arguments(
             "mapped_facet": 1,
             "custom_user_facet": 2,
             "airflow_run_facet": 3,
+            "debug": AirflowDebugRunFacet(packages=ANY),
         },
     )
 
 
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
@@ -274,6 +279,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
     mock_get_airflow_run_facet,
     mocked_adapter,
     mock_disabled,
+    mock_debug_mode,
 ):
     """Tests that the 'fail_task' method of the OpenLineageAdapter is invoked 
with the correct arguments.
 
@@ -311,11 +317,16 @@ def 
test_adapter_fail_task_is_called_with_proper_arguments(
         parent_run_id="execution_date.dag_id",
         run_id="execution_date.dag_id.task_id.1",
         task=listener.extractor_manager.extract_metadata(),
-        run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}},
+        run_facets={
+            "custom_user_facet": 2,
+            "airflow": {"task": "..."},
+            "debug": AirflowDebugRunFacet(packages=ANY),
+        },
         **expected_err_kwargs,
     )
 
 
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
@@ -328,6 +339,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
     mock_get_airflow_run_facet,
     mocked_adapter,
     mock_disabled,
+    mock_debug_mode,
 ):
     """Tests that the 'complete_task' method of the OpenLineageAdapter is 
called with the correct arguments.
 
@@ -364,7 +376,11 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         parent_run_id="execution_date.dag_id",
         run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
         task=listener.extractor_manager.extract_metadata(),
-        run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}},
+        run_facets={
+            "custom_user_facet": 2,
+            "airflow": {"task": "..."},
+            "debug": AirflowDebugRunFacet(packages=ANY),
+        },
     )
 
 
diff --git a/tests/providers/openlineage/plugins/test_utils.py 
b/tests/providers/openlineage/plugins/test_utils.py
index 962429e30e..0be62a5b0c 100644
--- a/tests/providers/openlineage/plugins/test_utils.py
+++ b/tests/providers/openlineage/plugins/test_utils.py
@@ -30,10 +30,13 @@ from pkg_resources import parse_version
 
 from airflow.models import DAG as AIRFLOW_DAG, DagModel
 from airflow.operators.bash import BashOperator
+from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
 from airflow.providers.openlineage.utils.utils import (
     InfoJsonEncodable,
     OpenLineageRedactor,
+    _get_all_packages_installed,
     _is_name_redactable,
+    get_airflow_debug_facet,
     get_airflow_run_facet,
     get_fully_qualified_class_name,
     is_operator_disabled,
@@ -55,6 +58,27 @@ class SafeStrDict(dict):
         return str(dict(castable))
 
 
+@patch("airflow.providers.openlineage.utils.utils.metadata.distributions")
+def test_get_all_packages_installed(mock_distributions):
+    mock_distributions.return_value = [MagicMock(metadata={"Name": 
"package1"}, version="1.0.0")]
+    assert _get_all_packages_installed() == {"package1": "1.0.0"}
+
+
+@patch("airflow.providers.openlineage.utils.utils.conf.debug_mode", 
return_value=False)
+def test_get_airflow_debug_facet_not_in_debug_mode(mock_debug_mode):
+    assert get_airflow_debug_facet() == {}
+
+
+@patch("airflow.providers.openlineage.utils.utils._get_all_packages_installed")
+@patch("airflow.providers.openlineage.utils.utils.conf.debug_mode")
+def test_get_airflow_debug_facet_logging_set_to_debug(mock_debug_mode, 
mock_get_packages):
+    mock_debug_mode.return_value = True
+    mock_get_packages.return_value = {"package1": "1.0.0"}
+    result = get_airflow_debug_facet()
+    expected_result = {"debug": AirflowDebugRunFacet(packages={"package1": 
"1.0.0"})}
+    assert result == expected_result
+
+
 @pytest.mark.db_test
 def test_get_dagrun_start_end():
     start_date = datetime.datetime(2022, 1, 1)
diff --git a/tests/providers/openlineage/test_conf.py 
b/tests/providers/openlineage/test_conf.py
index 7eeea35db7..f3c483d4ce 100644
--- a/tests/providers/openlineage/test_conf.py
+++ b/tests/providers/openlineage/test_conf.py
@@ -28,6 +28,7 @@ from airflow.providers.openlineage.conf import (
     custom_extractors,
     custom_run_facets,
     dag_state_change_process_pool_size,
+    debug_mode,
     disabled_operators,
     execution_timeout,
     include_full_task_info,
@@ -58,6 +59,7 @@ _VAR_URL = "OPENLINEAGE_URL"
 _CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable"
 _CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE = 
"dag_state_change_process_pool_size"
 _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO = "include_full_task_info"
+_CONFIG_OPTION_DEBUG_MODE = "debug_mode"
 
 _BOOL_PARAMS = (
     ("1", True),
@@ -577,3 +579,35 @@ def 
test_include_full_task_info_invalid_value_raise_error(var_string):
 @conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO): None})
 def test_include_full_task_info_do_not_fail_if_conf_option_missing():
     assert include_full_task_info() is False
+
+
[email protected](
+    ("var_string", "expected"),
+    _BOOL_PARAMS,
+)
+def test_debug_mode(var_string, expected):
+    with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DEBUG_MODE): var_string}):
+        result = debug_mode()
+        assert result is expected
+
+
[email protected](
+    "var_string",
+    (
+        "a",
+        "asdf",
+        "None",
+        "31",
+        "",
+        " ",
+    ),
+)
+def test_debug_mode_invalid_value_raise_error(var_string):
+    with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DEBUG_MODE): var_string}):
+        with pytest.raises(AirflowConfigException):
+            debug_mode()
+
+
+@conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_DEBUG_MODE): None})
+def test_debug_mode_do_not_fail_if_conf_option_missing():
+    assert debug_mode() is False

Reply via email to