This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aa23bfdbc7 feat: notify about potential serialization failures when 
sending DagRun, don't serialize unnecessary params, guard listener for 
exceptions (#41690)
aa23bfdbc7 is described below

commit aa23bfdbc735645b2cdeda4bb1360b60ae60e6e1
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Sep 2 15:59:13 2024 +0200

    feat: notify about potential serialization failures when sending DagRun, 
don't serialize unnecessary params, guard listener for exceptions (#41690)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 airflow/providers/openlineage/plugins/adapter.py   |  72 +++++---
 airflow/providers/openlineage/plugins/listener.py  | 149 ++++++++++-----
 airflow/providers/openlineage/utils/utils.py       |  16 +-
 .../providers/openlineage/plugins/test_adapter.py  | 202 +++++++++++----------
 .../providers/openlineage/plugins/test_listener.py |  31 +++-
 5 files changed, 299 insertions(+), 171 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index 847de5953d..8cd6e6c605 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -40,7 +40,6 @@ from openlineage.client.uuid import generate_static_uuid
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
 from airflow.providers.openlineage.utils.utils import (
     OpenLineageRedactor,
-    get_airflow_dag_run_facet,
     get_airflow_debug_facet,
     get_airflow_state_run_facet,
 )
@@ -50,9 +49,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 if TYPE_CHECKING:
     from datetime import datetime
 
-    from airflow.models.dagrun import DagRun
     from airflow.providers.openlineage.extractors import OperatorLineage
     from airflow.utils.log.secrets_masker import SecretsMasker
+    from airflow.utils.state import DagRunState
 
 _PRODUCER = 
f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}";
 
@@ -336,33 +335,36 @@ class OpenLineageAdapter(LoggingMixin):
 
     def dag_started(
         self,
-        dag_run: DagRun,
-        msg: str,
+        dag_id: str,
+        logical_date: datetime,
+        start_date: datetime,
         nominal_start_time: str,
         nominal_end_time: str,
+        owners: list[str],
+        run_facets: dict[str, RunFacet],
+        description: str | None = None,
         job_facets: dict[str, JobFacet] | None = None,  # Custom job facets
     ):
         try:
-            owner = [x.strip() for x in dag_run.dag.owner.split(",")] if 
dag_run.dag else None
             event = RunEvent(
                 eventType=RunState.START,
-                eventTime=dag_run.start_date.isoformat(),
+                eventTime=start_date.isoformat(),
                 job=self._build_job(
-                    job_name=dag_run.dag_id,
+                    job_name=dag_id,
                     job_type=_JOB_TYPE_DAG,
-                    job_description=dag_run.dag.description if dag_run.dag 
else None,
-                    owners=owner,
+                    job_description=description,
+                    owners=owners,
                     job_facets=job_facets,
                 ),
                 run=self._build_run(
                     run_id=self.build_dag_run_id(
-                        dag_id=dag_run.dag_id,
-                        logical_date=dag_run.logical_date,
+                        dag_id=dag_id,
+                        logical_date=logical_date,
                     ),
-                    job_name=dag_run.dag_id,
+                    job_name=dag_id,
                     nominal_start_time=nominal_start_time,
                     nominal_end_time=nominal_end_time,
-                    run_facets={**get_airflow_dag_run_facet(dag_run), 
**get_airflow_debug_facet()},
+                    run_facets={**run_facets, **get_airflow_debug_facet()},
                 ),
                 inputs=[],
                 outputs=[],
@@ -375,18 +377,29 @@ class OpenLineageAdapter(LoggingMixin):
             # This part cannot be wrapped to deduplicate code, otherwise the 
method cannot be pickled in multiprocessing.
             self.log.warning("Failed to emit DAG started event: \n %s", 
traceback.format_exc())
 
-    def dag_success(self, dag_run: DagRun, msg: str):
+    def dag_success(
+        self,
+        dag_id: str,
+        run_id: str,
+        end_date: datetime,
+        logical_date: datetime,
+        dag_run_state: DagRunState,
+        task_ids: list[str],
+    ):
         try:
             event = RunEvent(
                 eventType=RunState.COMPLETE,
-                eventTime=dag_run.end_date.isoformat(),
-                job=self._build_job(job_name=dag_run.dag_id, 
job_type=_JOB_TYPE_DAG),
+                eventTime=end_date.isoformat(),
+                job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
                 run=Run(
                     runId=self.build_dag_run_id(
-                        dag_id=dag_run.dag_id,
-                        logical_date=dag_run.logical_date,
+                        dag_id=dag_id,
+                        logical_date=logical_date,
                     ),
-                    facets={**get_airflow_state_run_facet(dag_run), 
**get_airflow_debug_facet()},
+                    facets={
+                        **get_airflow_state_run_facet(dag_id, run_id, 
task_ids, dag_run_state),
+                        **get_airflow_debug_facet(),
+                    },
                 ),
                 inputs=[],
                 outputs=[],
@@ -399,22 +412,31 @@ class OpenLineageAdapter(LoggingMixin):
             # This part cannot be wrapped to deduplicate code, otherwise the 
method cannot be pickled in multiprocessing.
             self.log.warning("Failed to emit DAG success event: \n %s", 
traceback.format_exc())
 
-    def dag_failed(self, dag_run: DagRun, msg: str):
+    def dag_failed(
+        self,
+        dag_id: str,
+        run_id: str,
+        end_date: datetime,
+        logical_date: datetime,
+        dag_run_state: DagRunState,
+        task_ids: list[str],
+        msg: str,
+    ):
         try:
             event = RunEvent(
                 eventType=RunState.FAIL,
-                eventTime=dag_run.end_date.isoformat(),
-                job=self._build_job(job_name=dag_run.dag_id, 
job_type=_JOB_TYPE_DAG),
+                eventTime=end_date.isoformat(),
+                job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
                 run=Run(
                     runId=self.build_dag_run_id(
-                        dag_id=dag_run.dag_id,
-                        logical_date=dag_run.logical_date,
+                        dag_id=dag_id,
+                        logical_date=logical_date,
                     ),
                     facets={
                         "errorMessage": error_message_run.ErrorMessageRunFacet(
                             message=msg, programmingLanguage="python"
                         ),
-                        **get_airflow_state_run_facet(dag_run),
+                        **get_airflow_state_run_facet(dag_id, run_id, 
task_ids, dag_run_state),
                         **get_airflow_debug_facet(),
                     },
                 ),
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 7882e188d9..fbe50e4a72 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -27,11 +27,13 @@ from setproctitle import getproctitle, setproctitle
 
 from airflow import settings
 from airflow.listeners import hookimpl
+from airflow.models import DagRun
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.extractors import ExtractorManager
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, 
RunState
 from airflow.providers.openlineage.utils.utils import (
     IS_AIRFLOW_2_10_OR_HIGHER,
+    get_airflow_dag_run_facet,
     get_airflow_debug_facet,
     get_airflow_job_facet,
     get_airflow_mapped_task_facet,
@@ -51,7 +53,7 @@ from airflow.utils.timeout import timeout
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
-    from airflow.models import DagRun, TaskInstance
+    from airflow.models import TaskInstance
 
 _openlineage_listener: OpenLineageListener | None = None
 
@@ -413,65 +415,120 @@ class OpenLineageListener:
 
     @hookimpl
     def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
-        if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
-            self.log.debug(
-                "Skipping OpenLineage event emission for DAG `%s` "
-                "due to lack of explicit lineage enablement for DAG while "
-                "[openlineage] selective_enable is on.",
-                dag_run.dag_id,
+        try:
+            if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+                self.log.debug(
+                    "Skipping OpenLineage event emission for DAG `%s` "
+                    "due to lack of explicit lineage enablement for DAG while "
+                    "[openlineage] selective_enable is on.",
+                    dag_run.dag_id,
+                )
+                return
+
+            if not self.executor:
+                self.log.debug("Executor have not started before 
`on_dag_run_running`")
+                return
+
+            data_interval_start = (
+                dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
             )
-            return
+            data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
 
-        if not self.executor:
-            self.log.debug("Executor have not started before 
`on_dag_run_running`")
-            return
+            run_facets = {**get_airflow_dag_run_facet(dag_run)}
 
-        data_interval_start = dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
-        data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
-        self.executor.submit(
-            self.adapter.dag_started,
-            dag_run=dag_run,
-            msg=msg,
-            nominal_start_time=data_interval_start,
-            nominal_end_time=data_interval_end,
-            # AirflowJobFacet should be created outside ProcessPoolExecutor 
that pickles objects,
-            # as it causes lack of some TaskGroup attributes and crashes event 
emission.
-            job_facets=get_airflow_job_facet(dag_run=dag_run),
-        )
+            self.submit_callable(
+                self.adapter.dag_started,
+                dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
+                logical_date=dag_run.logical_date,
+                start_date=dag_run.start_date,
+                nominal_start_time=data_interval_start,
+                nominal_end_time=data_interval_end,
+                run_facets=run_facets,
+                owners=[x.strip() for x in dag_run.dag.owner.split(",")] if 
dag_run.dag else None,
+                description=dag_run.dag.description if dag_run.dag else None,
+                # AirflowJobFacet should be created outside 
ProcessPoolExecutor that pickles objects,
+                # as it causes lack of some TaskGroup attributes and crashes 
event emission.
+                job_facets=get_airflow_job_facet(dag_run=dag_run),
+            )
+        except BaseException as e:
+            self.log.warning("OpenLineage received exception in method 
on_dag_run_running", exc_info=e)
 
     @hookimpl
     def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
-        if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
-            self.log.debug(
-                "Skipping OpenLineage event emission for DAG `%s` "
-                "due to lack of explicit lineage enablement for DAG while "
-                "[openlineage] selective_enable is on.",
-                dag_run.dag_id,
-            )
-            return
+        try:
+            if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+                self.log.debug(
+                    "Skipping OpenLineage event emission for DAG `%s` "
+                    "due to lack of explicit lineage enablement for DAG while "
+                    "[openlineage] selective_enable is on.",
+                    dag_run.dag_id,
+                )
+                return
 
-        if not self.executor:
-            self.log.debug("Executor have not started before 
`on_dag_run_success`")
-            return
+            if not self.executor:
+                self.log.debug("Executor have not started before 
`on_dag_run_success`")
+                return
 
-        self.executor.submit(self.adapter.dag_success, dag_run=dag_run, 
msg=msg)
+            if IS_AIRFLOW_2_10_OR_HIGHER:
+                task_ids = DagRun._get_partial_task_ids(dag_run.dag)
+            else:
+                task_ids = dag_run.dag.task_ids if dag_run.dag and 
dag_run.dag.partial else None
+            self.submit_callable(
+                self.adapter.dag_success,
+                dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
+                end_date=dag_run.end_date,
+                logical_date=dag_run.logical_date,
+                task_ids=task_ids,
+                dag_run_state=dag_run.get_state(),
+            )
+        except BaseException as e:
+            self.log.warning("OpenLineage received exception in method 
on_dag_run_success", exc_info=e)
 
     @hookimpl
     def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
-        if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
-            self.log.debug(
-                "Skipping OpenLineage event emission for DAG `%s` "
-                "due to lack of explicit lineage enablement for DAG while "
-                "[openlineage] selective_enable is on.",
-                dag_run.dag_id,
+        try:
+            if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+                self.log.debug(
+                    "Skipping OpenLineage event emission for DAG `%s` "
+                    "due to lack of explicit lineage enablement for DAG while "
+                    "[openlineage] selective_enable is on.",
+                    dag_run.dag_id,
+                )
+                return
+
+            if not self.executor:
+                self.log.debug("Executor have not started before 
`on_dag_run_failed`")
+                return
+
+            if IS_AIRFLOW_2_10_OR_HIGHER:
+                task_ids = DagRun._get_partial_task_ids(dag_run.dag)
+            else:
+                task_ids = dag_run.dag.task_ids if dag_run.dag and 
dag_run.dag.partial else None
+            self.submit_callable(
+                self.adapter.dag_failed,
+                dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
+                end_date=dag_run.end_date,
+                logical_date=dag_run.logical_date,
+                dag_run_state=dag_run.get_state(),
+                task_ids=task_ids,
+                msg=msg,
             )
-            return
+        except BaseException as e:
+            self.log.warning("OpenLineage received exception in method 
on_dag_run_failed", exc_info=e)
 
-        if not self.executor:
-            self.log.debug("Executor have not started before 
`on_dag_run_failed`")
-            return
+    def submit_callable(self, callable, *args, **kwargs):
+        fut = self.executor.submit(callable, *args, **kwargs)
+        fut.add_done_callback(self.log_submit_error)
+        return fut
 
-        self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
+    def log_submit_error(self, fut):
+        if fut.exception():
+            self.log.warning("Failed to submit method to executor", 
exc_info=fut.exception())
+        else:
+            self.log.debug("Successfully submitted method to executor")
 
 
 def get_openlineage_listener() -> OpenLineageListener:
diff --git a/airflow/providers/openlineage/utils/utils.py 
b/airflow/providers/openlineage/utils/utils.py
index ec58c6e2d7..f283c09e87 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -33,7 +33,7 @@ from packaging.version import Version
 from airflow import __version__ as AIRFLOW_VERSION
 from airflow.datasets import Dataset
 from airflow.exceptions import AirflowProviderDeprecationWarning  # TODO: move 
this maybe to Airflow's logic?
-from airflow.models import DAG, BaseOperator, MappedOperator
+from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.plugins.facets import (
     AirflowDagRunFacet,
@@ -58,9 +58,8 @@ if TYPE_CHECKING:
     from openlineage.client.event_v2 import Dataset as OpenLineageDataset
     from openlineage.client.facet_v2 import RunFacet
 
-    from airflow.models import DagRun, TaskInstance
-    from airflow.utils.state import TaskInstanceState
-
+    from airflow.models import TaskInstance
+    from airflow.utils.state import DagRunState, TaskInstanceState
 
 log = logging.getLogger(__name__)
 _NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
@@ -439,11 +438,14 @@ def get_airflow_job_facet(dag_run: DagRun) -> dict[str, 
AirflowJobFacet]:
     }
 
 
-def get_airflow_state_run_facet(dag_run: DagRun) -> dict[str, 
AirflowStateRunFacet]:
+def get_airflow_state_run_facet(
+    dag_id: str, run_id: str, task_ids: list[str], dag_run_state: DagRunState
+) -> dict[str, AirflowStateRunFacet]:
+    tis = DagRun.fetch_task_instances(dag_id=dag_id, run_id=run_id, 
task_ids=task_ids)
     return {
         "airflowState": AirflowStateRunFacet(
-            dagRunState=dag_run.get_state(),
-            tasksState={ti.task_id: ti.state for ti in 
dag_run.get_task_instances()},
+            dagRunState=dag_run_state,
+            tasksState={ti.task_id: ti.state for ti in tis},
         )
     }
 
diff --git a/tests/providers/openlineage/plugins/test_adapter.py 
b/tests/providers/openlineage/plugins/test_adapter.py
index a1e702e263..2608834708 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -528,7 +528,7 @@ def 
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.conf.debug_mode", 
return_value=False)
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
@@ -536,7 +536,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
-    event_time = datetime.datetime.now()
+    event_time = datetime.datetime.fromisoformat("2021-01-01T00:00:00+00:00")
     dag_id = "dag_id"
     run_id = str(uuid.uuid4())
 
@@ -564,14 +564,6 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
 
     job_facets = {**get_airflow_job_facet(dag_run)}
 
-    adapter.dag_started(
-        dag_run=dag_run,
-        msg="",
-        nominal_start_time=event_time.isoformat(),
-        nominal_end_time=event_time.isoformat(),
-        job_facets=job_facets,
-    )
-
     expected_dag_info = {
         "timetable": {"delta": 86400.0},
         "dag_id": dag_id,
@@ -586,6 +578,32 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
     else:  # Airflow 3 and up.
         expected_dag_info["timetable_summary"] = "1 day, 0:00:00"
 
+    dag_run_facet = AirflowDagRunFacet(
+        dag=expected_dag_info,
+        dagRun={
+            "conf": {},
+            "dag_id": "dag_id",
+            "data_interval_start": event_time.isoformat(),
+            "data_interval_end": event_time.isoformat(),
+            "external_trigger": None,
+            "run_id": run_id,
+            "run_type": None,
+            "start_date": event_time.isoformat(),
+        },
+    )
+
+    adapter.dag_started(
+        dag_id=dag_id,
+        start_date=event_time,
+        logical_date=event_time,
+        nominal_start_time=event_time.isoformat(),
+        nominal_end_time=event_time.isoformat(),
+        owners=["airflow"],
+        description=dag.description,
+        run_facets={"airflowDagRun": dag_run_facet},
+        job_facets=job_facets,
+    )
+
     assert len(client.emit.mock_calls) == 1
     client.emit.assert_called_once_with(
         RunEvent(
@@ -611,7 +629,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
                             "start_date": event_time.isoformat(),
                         },
                     ),
-                    # "debug": AirflowDebugRunFacet(packages=ANY),
+                    "debug": AirflowDebugRunFacet(packages=ANY),
                 },
             ),
             job=Job(
@@ -635,18 +653,17 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
             outputs=[],
         )
     )
-
     mock_stats_incr.assert_not_called()
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
 @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
[email protected](DagRun, "get_task_instances")
[email protected](DagRun, "fetch_task_instances")
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
 def test_emit_dag_complete_event(
-    mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks, 
mock_debug_mode
+    mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_fetch_tis, 
mock_debug_mode
 ):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
@@ -670,7 +687,7 @@ def test_emit_dag_complete_event(
     )
     dag_run._state = DagRunState.SUCCESS
     dag_run.end_date = event_time
-    mocked_get_tasks.return_value = [
+    mocked_fetch_tis.return_value = [
         TaskInstance(task=task_0, run_id=run_id, 
state=TaskInstanceState.SUCCESS),
         TaskInstance(task=task_1, run_id=run_id, 
state=TaskInstanceState.SKIPPED),
         TaskInstance(task=task_2, run_id=run_id, 
state=TaskInstanceState.FAILED),
@@ -678,44 +695,45 @@ def test_emit_dag_complete_event(
     generate_static_uuid.return_value = random_uuid
 
     adapter.dag_success(
-        dag_run=dag_run,
-        msg="",
+        dag_id=dag_id,
+        run_id=run_id,
+        end_date=event_time,
+        logical_date=event_time,
+        dag_run_state=DagRunState.SUCCESS,
+        task_ids=["task_0", "task_1", "task_2.test"],
     )
 
-    assert (
-        call(
-            RunEvent(
-                eventType=RunState.COMPLETE,
-                eventTime=event_time.isoformat(),
-                run=Run(
-                    runId=random_uuid,
-                    facets={
-                        "airflowState": AirflowStateRunFacet(
-                            dagRunState=DagRunState.SUCCESS,
-                            tasksState={
-                                task_0.task_id: TaskInstanceState.SUCCESS,
-                                task_1.task_id: TaskInstanceState.SKIPPED,
-                                task_2.task_id: TaskInstanceState.FAILED,
-                            },
-                        ),
-                        "debug": AirflowDebugRunFacet(packages=ANY),
-                    },
-                ),
-                job=Job(
-                    namespace=namespace(),
-                    name=dag_id,
-                    facets={
-                        "jobType": job_type_job.JobTypeJobFacet(
-                            processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
-                        )
-                    },
-                ),
-                producer=_PRODUCER,
-                inputs=[],
-                outputs=[],
-            )
+    client.emit.assert_called_once_with(
+        RunEvent(
+            eventType=RunState.COMPLETE,
+            eventTime=event_time.isoformat(),
+            run=Run(
+                runId=random_uuid,
+                facets={
+                    "airflowState": AirflowStateRunFacet(
+                        dagRunState=DagRunState.SUCCESS,
+                        tasksState={
+                            task_0.task_id: TaskInstanceState.SUCCESS,
+                            task_1.task_id: TaskInstanceState.SKIPPED,
+                            task_2.task_id: TaskInstanceState.FAILED,
+                        },
+                    ),
+                    "debug": AirflowDebugRunFacet(packages=ANY),
+                },
+            ),
+            job=Job(
+                namespace=namespace(),
+                name=dag_id,
+                facets={
+                    "jobType": job_type_job.JobTypeJobFacet(
+                        processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
+                    )
+                },
+            ),
+            producer=_PRODUCER,
+            inputs=[],
+            outputs=[],
         )
-        in client.emit.mock_calls
     )
 
     mock_stats_incr.assert_not_called()
@@ -723,12 +741,12 @@ def test_emit_dag_complete_event(
 
 
 @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
[email protected](DagRun, "get_task_instances")
[email protected](DagRun, "fetch_task_instances")
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
 def test_emit_dag_failed_event(
-    mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks, 
mock_debug_mode
+    mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_fetch_tis, 
mock_debug_mode
 ):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
@@ -748,9 +766,9 @@ def test_emit_dag_failed_event(
         start_date=event_time,
         execution_date=event_time,
     )
-    dag_run._state = DagRunState.SUCCESS
+    dag_run._state = DagRunState.FAILED
     dag_run.end_date = event_time
-    mocked_get_tasks.return_value = [
+    mocked_fetch_tis.return_value = [
         TaskInstance(task=task_0, run_id=run_id, 
state=TaskInstanceState.SUCCESS),
         TaskInstance(task=task_1, run_id=run_id, 
state=TaskInstanceState.SKIPPED),
         TaskInstance(task=task_2, run_id=run_id, 
state=TaskInstanceState.FAILED),
@@ -758,47 +776,49 @@ def test_emit_dag_failed_event(
     generate_static_uuid.return_value = random_uuid
 
     adapter.dag_failed(
-        dag_run=dag_run,
+        dag_id=dag_id,
+        run_id=run_id,
+        end_date=event_time,
+        logical_date=event_time,
+        dag_run_state=DagRunState.FAILED,
+        task_ids=["task_0", "task_1", "task_2.test"],
         msg="error msg",
     )
 
-    assert (
-        call(
-            RunEvent(
-                eventType=RunState.FAIL,
-                eventTime=event_time.isoformat(),
-                run=Run(
-                    runId=random_uuid,
-                    facets={
-                        "errorMessage": error_message_run.ErrorMessageRunFacet(
-                            message="error msg", programmingLanguage="python"
-                        ),
-                        "airflowState": AirflowStateRunFacet(
-                            dagRunState=DagRunState.SUCCESS,
-                            tasksState={
-                                task_0.task_id: TaskInstanceState.SUCCESS,
-                                task_1.task_id: TaskInstanceState.SKIPPED,
-                                task_2.task_id: TaskInstanceState.FAILED,
-                            },
-                        ),
-                        "debug": AirflowDebugRunFacet(packages=ANY),
-                    },
-                ),
-                job=Job(
-                    namespace=namespace(),
-                    name=dag_id,
-                    facets={
-                        "jobType": job_type_job.JobTypeJobFacet(
-                            processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
-                        )
-                    },
-                ),
-                producer=_PRODUCER,
-                inputs=[],
-                outputs=[],
-            )
+    client.emit.assert_called_once_with(
+        RunEvent(
+            eventType=RunState.FAIL,
+            eventTime=event_time.isoformat(),
+            run=Run(
+                runId=random_uuid,
+                facets={
+                    "errorMessage": error_message_run.ErrorMessageRunFacet(
+                        message="error msg", programmingLanguage="python"
+                    ),
+                    "airflowState": AirflowStateRunFacet(
+                        dagRunState=DagRunState.FAILED,
+                        tasksState={
+                            task_0.task_id: TaskInstanceState.SUCCESS,
+                            task_1.task_id: TaskInstanceState.SKIPPED,
+                            task_2.task_id: TaskInstanceState.FAILED,
+                        },
+                    ),
+                    "debug": AirflowDebugRunFacet(packages=ANY),
+                },
+            ),
+            job=Job(
+                namespace=namespace(),
+                name=dag_id,
+                facets={
+                    "jobType": job_type_job.JobTypeJobFacet(
+                        processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
+                    )
+                },
+            ),
+            producer=_PRODUCER,
+            inputs=[],
+            outputs=[],
         )
-        in client.emit.mock_calls
     )
 
     mock_stats_incr.assert_not_called()
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index de2732cb36..22e3f5d44b 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -17,22 +17,27 @@
 from __future__ import annotations
 
 import datetime as dt
+import threading
 import uuid
 from contextlib import suppress
 from typing import Callable
 from unittest import mock
-from unittest.mock import ANY, patch
+from unittest.mock import ANY, MagicMock, patch
 
 import pandas as pd
 import pytest
+from openlineage.client import OpenLineageClient
+from openlineage.client.transport import ConsoleTransport
+from openlineage.client.transport.console import ConsoleConfig
 
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.models.baseoperator import BaseOperator
 from airflow.operators.python import PythonOperator
+from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
 from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
 from airflow.providers.openlineage.plugins.listener import OpenLineageListener
 from airflow.providers.openlineage.utils.selective_enable import 
disable_lineage, enable_lineage
-from airflow.utils.state import State
+from airflow.utils.state import DagRunState, State
 from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
 from tests.test_utils.config import conf_vars
 
@@ -588,6 +593,28 @@ def 
test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_exec
     mock_executor.return_value.submit.assert_called_once()
 
 
+def test_listener_logs_failed_serialization():
+    listener = OpenLineageListener()
+    listener.log = MagicMock()
+    listener.adapter = OpenLineageAdapter(
+        
client=OpenLineageClient(transport=ConsoleTransport(config=ConsoleConfig()))
+    )
+    event_time = dt.datetime.now()
+
+    fut = listener.submit_callable(
+        listener.adapter.dag_failed,
+        dag_id="",
+        run_id="",
+        end_date=event_time,
+        execution_date=threading.Thread(),
+        dag_run_state=DagRunState.FAILED,
+        task_ids=["task_id"],
+        msg="",
+    )
+    assert fut.exception(10)
+    listener.log.warning.assert_called_once()
+
+
 class TestOpenLineageSelectiveEnable:
     def setup_method(self):
         self.dag = DAG(

Reply via email to