This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aa23bfdbc7 feat: notify about potential serialization failures when
sending DagRun, don't serialize unnecessary params, guard listener for
exceptions (#41690)
aa23bfdbc7 is described below
commit aa23bfdbc735645b2cdeda4bb1360b60ae60e6e1
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Sep 2 15:59:13 2024 +0200
feat: notify about potential serialization failures when sending DagRun,
don't serialize unnecessary params, guard listener for exceptions (#41690)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
airflow/providers/openlineage/plugins/adapter.py | 72 +++++---
airflow/providers/openlineage/plugins/listener.py | 149 ++++++++++-----
airflow/providers/openlineage/utils/utils.py | 16 +-
.../providers/openlineage/plugins/test_adapter.py | 202 +++++++++++----------
.../providers/openlineage/plugins/test_listener.py | 31 +++-
5 files changed, 299 insertions(+), 171 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index 847de5953d..8cd6e6c605 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -40,7 +40,6 @@ from openlineage.client.uuid import generate_static_uuid
from airflow.providers.openlineage import __version__ as
OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
- get_airflow_dag_run_facet,
get_airflow_debug_facet,
get_airflow_state_run_facet,
)
@@ -50,9 +49,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
from datetime import datetime
- from airflow.models.dagrun import DagRun
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.log.secrets_masker import SecretsMasker
+ from airflow.utils.state import DagRunState
_PRODUCER =
f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}"
@@ -336,33 +335,36 @@ class OpenLineageAdapter(LoggingMixin):
def dag_started(
self,
- dag_run: DagRun,
- msg: str,
+ dag_id: str,
+ logical_date: datetime,
+ start_date: datetime,
nominal_start_time: str,
nominal_end_time: str,
+ owners: list[str],
+ run_facets: dict[str, RunFacet],
+ description: str | None = None,
job_facets: dict[str, JobFacet] | None = None, # Custom job facets
):
try:
- owner = [x.strip() for x in dag_run.dag.owner.split(",")] if
dag_run.dag else None
event = RunEvent(
eventType=RunState.START,
- eventTime=dag_run.start_date.isoformat(),
+ eventTime=start_date.isoformat(),
job=self._build_job(
- job_name=dag_run.dag_id,
+ job_name=dag_id,
job_type=_JOB_TYPE_DAG,
- job_description=dag_run.dag.description if dag_run.dag
else None,
- owners=owner,
+ job_description=description,
+ owners=owners,
job_facets=job_facets,
),
run=self._build_run(
run_id=self.build_dag_run_id(
- dag_id=dag_run.dag_id,
- logical_date=dag_run.logical_date,
+ dag_id=dag_id,
+ logical_date=logical_date,
),
- job_name=dag_run.dag_id,
+ job_name=dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
- run_facets={**get_airflow_dag_run_facet(dag_run),
**get_airflow_debug_facet()},
+ run_facets={**run_facets, **get_airflow_debug_facet()},
),
inputs=[],
outputs=[],
@@ -375,18 +377,29 @@ class OpenLineageAdapter(LoggingMixin):
# This part cannot be wrapped to deduplicate code, otherwise the
method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG started event: \n %s",
traceback.format_exc())
- def dag_success(self, dag_run: DagRun, msg: str):
+ def dag_success(
+ self,
+ dag_id: str,
+ run_id: str,
+ end_date: datetime,
+ logical_date: datetime,
+ dag_run_state: DagRunState,
+ task_ids: list[str],
+ ):
try:
event = RunEvent(
eventType=RunState.COMPLETE,
- eventTime=dag_run.end_date.isoformat(),
- job=self._build_job(job_name=dag_run.dag_id,
job_type=_JOB_TYPE_DAG),
+ eventTime=end_date.isoformat(),
+ job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(
- dag_id=dag_run.dag_id,
- logical_date=dag_run.logical_date,
+ dag_id=dag_id,
+ logical_date=logical_date,
),
- facets={**get_airflow_state_run_facet(dag_run),
**get_airflow_debug_facet()},
+ facets={
+ **get_airflow_state_run_facet(dag_id, run_id,
task_ids, dag_run_state),
+ **get_airflow_debug_facet(),
+ },
),
inputs=[],
outputs=[],
@@ -399,22 +412,31 @@ class OpenLineageAdapter(LoggingMixin):
# This part cannot be wrapped to deduplicate code, otherwise the
method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG success event: \n %s",
traceback.format_exc())
- def dag_failed(self, dag_run: DagRun, msg: str):
+ def dag_failed(
+ self,
+ dag_id: str,
+ run_id: str,
+ end_date: datetime,
+ logical_date: datetime,
+ dag_run_state: DagRunState,
+ task_ids: list[str],
+ msg: str,
+ ):
try:
event = RunEvent(
eventType=RunState.FAIL,
- eventTime=dag_run.end_date.isoformat(),
- job=self._build_job(job_name=dag_run.dag_id,
job_type=_JOB_TYPE_DAG),
+ eventTime=end_date.isoformat(),
+ job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(
- dag_id=dag_run.dag_id,
- logical_date=dag_run.logical_date,
+ dag_id=dag_id,
+ logical_date=logical_date,
),
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
message=msg, programmingLanguage="python"
),
- **get_airflow_state_run_facet(dag_run),
+ **get_airflow_state_run_facet(dag_id, run_id,
task_ids, dag_run_state),
**get_airflow_debug_facet(),
},
),
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 7882e188d9..fbe50e4a72 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -27,11 +27,13 @@ from setproctitle import getproctitle, setproctitle
from airflow import settings
from airflow.listeners import hookimpl
+from airflow.models import DagRun
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter,
RunState
from airflow.providers.openlineage.utils.utils import (
IS_AIRFLOW_2_10_OR_HIGHER,
+ get_airflow_dag_run_facet,
get_airflow_debug_facet,
get_airflow_job_facet,
get_airflow_mapped_task_facet,
@@ -51,7 +53,7 @@ from airflow.utils.timeout import timeout
if TYPE_CHECKING:
from sqlalchemy.orm import Session
- from airflow.models import DagRun, TaskInstance
+ from airflow.models import TaskInstance
_openlineage_listener: OpenLineageListener | None = None
@@ -413,65 +415,120 @@ class OpenLineageListener:
@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
- if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
- self.log.debug(
- "Skipping OpenLineage event emission for DAG `%s` "
- "due to lack of explicit lineage enablement for DAG while "
- "[openlineage] selective_enable is on.",
- dag_run.dag_id,
+ try:
+ if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+ self.log.debug(
+ "Skipping OpenLineage event emission for DAG `%s` "
+ "due to lack of explicit lineage enablement for DAG while "
+ "[openlineage] selective_enable is on.",
+ dag_run.dag_id,
+ )
+ return
+
+ if not self.executor:
+ self.log.debug("Executor have not started before
`on_dag_run_running`")
+ return
+
+ data_interval_start = (
+ dag_run.data_interval_start.isoformat() if
dag_run.data_interval_start else None
)
- return
+ data_interval_end = dag_run.data_interval_end.isoformat() if
dag_run.data_interval_end else None
- if not self.executor:
- self.log.debug("Executor have not started before
`on_dag_run_running`")
- return
+ run_facets = {**get_airflow_dag_run_facet(dag_run)}
- data_interval_start = dag_run.data_interval_start.isoformat() if
dag_run.data_interval_start else None
- data_interval_end = dag_run.data_interval_end.isoformat() if
dag_run.data_interval_end else None
- self.executor.submit(
- self.adapter.dag_started,
- dag_run=dag_run,
- msg=msg,
- nominal_start_time=data_interval_start,
- nominal_end_time=data_interval_end,
- # AirflowJobFacet should be created outside ProcessPoolExecutor
that pickles objects,
- # as it causes lack of some TaskGroup attributes and crashes event
emission.
- job_facets=get_airflow_job_facet(dag_run=dag_run),
- )
+ self.submit_callable(
+ self.adapter.dag_started,
+ dag_id=dag_run.dag_id,
+ run_id=dag_run.run_id,
+ logical_date=dag_run.logical_date,
+ start_date=dag_run.start_date,
+ nominal_start_time=data_interval_start,
+ nominal_end_time=data_interval_end,
+ run_facets=run_facets,
+ owners=[x.strip() for x in dag_run.dag.owner.split(",")] if
dag_run.dag else None,
+ description=dag_run.dag.description if dag_run.dag else None,
+ # AirflowJobFacet should be created outside
ProcessPoolExecutor that pickles objects,
+ # as it causes lack of some TaskGroup attributes and crashes
event emission.
+ job_facets=get_airflow_job_facet(dag_run=dag_run),
+ )
+ except BaseException as e:
+ self.log.warning("OpenLineage received exception in method
on_dag_run_running", exc_info=e)
@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
- if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
- self.log.debug(
- "Skipping OpenLineage event emission for DAG `%s` "
- "due to lack of explicit lineage enablement for DAG while "
- "[openlineage] selective_enable is on.",
- dag_run.dag_id,
- )
- return
+ try:
+ if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+ self.log.debug(
+ "Skipping OpenLineage event emission for DAG `%s` "
+ "due to lack of explicit lineage enablement for DAG while "
+ "[openlineage] selective_enable is on.",
+ dag_run.dag_id,
+ )
+ return
- if not self.executor:
- self.log.debug("Executor have not started before
`on_dag_run_success`")
- return
+ if not self.executor:
+ self.log.debug("Executor have not started before
`on_dag_run_success`")
+ return
- self.executor.submit(self.adapter.dag_success, dag_run=dag_run,
msg=msg)
+ if IS_AIRFLOW_2_10_OR_HIGHER:
+ task_ids = DagRun._get_partial_task_ids(dag_run.dag)
+ else:
+ task_ids = dag_run.dag.task_ids if dag_run.dag and
dag_run.dag.partial else None
+ self.submit_callable(
+ self.adapter.dag_success,
+ dag_id=dag_run.dag_id,
+ run_id=dag_run.run_id,
+ end_date=dag_run.end_date,
+ logical_date=dag_run.logical_date,
+ task_ids=task_ids,
+ dag_run_state=dag_run.get_state(),
+ )
+ except BaseException as e:
+ self.log.warning("OpenLineage received exception in method
on_dag_run_success", exc_info=e)
@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
- if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
- self.log.debug(
- "Skipping OpenLineage event emission for DAG `%s` "
- "due to lack of explicit lineage enablement for DAG while "
- "[openlineage] selective_enable is on.",
- dag_run.dag_id,
+ try:
+ if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
+ self.log.debug(
+ "Skipping OpenLineage event emission for DAG `%s` "
+ "due to lack of explicit lineage enablement for DAG while "
+ "[openlineage] selective_enable is on.",
+ dag_run.dag_id,
+ )
+ return
+
+ if not self.executor:
+ self.log.debug("Executor have not started before
`on_dag_run_failed`")
+ return
+
+ if IS_AIRFLOW_2_10_OR_HIGHER:
+ task_ids = DagRun._get_partial_task_ids(dag_run.dag)
+ else:
+ task_ids = dag_run.dag.task_ids if dag_run.dag and
dag_run.dag.partial else None
+ self.submit_callable(
+ self.adapter.dag_failed,
+ dag_id=dag_run.dag_id,
+ run_id=dag_run.run_id,
+ end_date=dag_run.end_date,
+ logical_date=dag_run.logical_date,
+ dag_run_state=dag_run.get_state(),
+ task_ids=task_ids,
+ msg=msg,
)
- return
+ except BaseException as e:
+ self.log.warning("OpenLineage received exception in method
on_dag_run_failed", exc_info=e)
- if not self.executor:
- self.log.debug("Executor have not started before
`on_dag_run_failed`")
- return
+ def submit_callable(self, callable, *args, **kwargs):
+ fut = self.executor.submit(callable, *args, **kwargs)
+ fut.add_done_callback(self.log_submit_error)
+ return fut
- self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
+ def log_submit_error(self, fut):
+ if fut.exception():
+ self.log.warning("Failed to submit method to executor",
exc_info=fut.exception())
+ else:
+ self.log.debug("Successfully submitted method to executor")
def get_openlineage_listener() -> OpenLineageListener:
diff --git a/airflow/providers/openlineage/utils/utils.py
b/airflow/providers/openlineage/utils/utils.py
index ec58c6e2d7..f283c09e87 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -33,7 +33,7 @@ from packaging.version import Version
from airflow import __version__ as AIRFLOW_VERSION
from airflow.datasets import Dataset
from airflow.exceptions import AirflowProviderDeprecationWarning # TODO: move
this maybe to Airflow's logic?
-from airflow.models import DAG, BaseOperator, MappedOperator
+from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
@@ -58,9 +58,8 @@ if TYPE_CHECKING:
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
from openlineage.client.facet_v2 import RunFacet
- from airflow.models import DagRun, TaskInstance
- from airflow.utils.state import TaskInstanceState
-
+ from airflow.models import TaskInstance
+ from airflow.utils.state import DagRunState, TaskInstanceState
log = logging.getLogger(__name__)
_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
@@ -439,11 +438,14 @@ def get_airflow_job_facet(dag_run: DagRun) -> dict[str,
AirflowJobFacet]:
}
-def get_airflow_state_run_facet(dag_run: DagRun) -> dict[str,
AirflowStateRunFacet]:
+def get_airflow_state_run_facet(
+ dag_id: str, run_id: str, task_ids: list[str], dag_run_state: DagRunState
+) -> dict[str, AirflowStateRunFacet]:
+ tis = DagRun.fetch_task_instances(dag_id=dag_id, run_id=run_id,
task_ids=task_ids)
return {
"airflowState": AirflowStateRunFacet(
- dagRunState=dag_run.get_state(),
- tasksState={ti.task_id: ti.state for ti in
dag_run.get_task_instances()},
+ dagRunState=dag_run_state,
+ tasksState={ti.task_id: ti.state for ti in tis},
)
}
diff --git a/tests/providers/openlineage/plugins/test_adapter.py
b/tests/providers/openlineage/plugins/test_adapter.py
index a1e702e263..2608834708 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -528,7 +528,7 @@ def
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected]("airflow.providers.openlineage.conf.debug_mode",
return_value=False)
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
@@ -536,7 +536,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
- event_time = datetime.datetime.now()
+ event_time = datetime.datetime.fromisoformat("2021-01-01T00:00:00+00:00")
dag_id = "dag_id"
run_id = str(uuid.uuid4())
@@ -564,14 +564,6 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
job_facets = {**get_airflow_job_facet(dag_run)}
- adapter.dag_started(
- dag_run=dag_run,
- msg="",
- nominal_start_time=event_time.isoformat(),
- nominal_end_time=event_time.isoformat(),
- job_facets=job_facets,
- )
-
expected_dag_info = {
"timetable": {"delta": 86400.0},
"dag_id": dag_id,
@@ -586,6 +578,32 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
else: # Airflow 3 and up.
expected_dag_info["timetable_summary"] = "1 day, 0:00:00"
+ dag_run_facet = AirflowDagRunFacet(
+ dag=expected_dag_info,
+ dagRun={
+ "conf": {},
+ "dag_id": "dag_id",
+ "data_interval_start": event_time.isoformat(),
+ "data_interval_end": event_time.isoformat(),
+ "external_trigger": None,
+ "run_id": run_id,
+ "run_type": None,
+ "start_date": event_time.isoformat(),
+ },
+ )
+
+ adapter.dag_started(
+ dag_id=dag_id,
+ start_date=event_time,
+ logical_date=event_time,
+ nominal_start_time=event_time.isoformat(),
+ nominal_end_time=event_time.isoformat(),
+ owners=["airflow"],
+ description=dag.description,
+ run_facets={"airflowDagRun": dag_run_facet},
+ job_facets=job_facets,
+ )
+
assert len(client.emit.mock_calls) == 1
client.emit.assert_called_once_with(
RunEvent(
@@ -611,7 +629,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
"start_date": event_time.isoformat(),
},
),
- # "debug": AirflowDebugRunFacet(packages=ANY),
+ "debug": AirflowDebugRunFacet(packages=ANY),
},
),
job=Job(
@@ -635,18 +653,17 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
outputs=[],
)
)
-
mock_stats_incr.assert_not_called()
mock_stats_timer.assert_called_with("ol.emit.attempts")
@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
[email protected](DagRun, "get_task_instances")
[email protected](DagRun, "fetch_task_instances")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_dag_complete_event(
- mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks,
mock_debug_mode
+ mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_fetch_tis,
mock_debug_mode
):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
@@ -670,7 +687,7 @@ def test_emit_dag_complete_event(
)
dag_run._state = DagRunState.SUCCESS
dag_run.end_date = event_time
- mocked_get_tasks.return_value = [
+ mocked_fetch_tis.return_value = [
TaskInstance(task=task_0, run_id=run_id,
state=TaskInstanceState.SUCCESS),
TaskInstance(task=task_1, run_id=run_id,
state=TaskInstanceState.SKIPPED),
TaskInstance(task=task_2, run_id=run_id,
state=TaskInstanceState.FAILED),
@@ -678,44 +695,45 @@ def test_emit_dag_complete_event(
generate_static_uuid.return_value = random_uuid
adapter.dag_success(
- dag_run=dag_run,
- msg="",
+ dag_id=dag_id,
+ run_id=run_id,
+ end_date=event_time,
+ logical_date=event_time,
+ dag_run_state=DagRunState.SUCCESS,
+ task_ids=["task_0", "task_1", "task_2.test"],
)
- assert (
- call(
- RunEvent(
- eventType=RunState.COMPLETE,
- eventTime=event_time.isoformat(),
- run=Run(
- runId=random_uuid,
- facets={
- "airflowState": AirflowStateRunFacet(
- dagRunState=DagRunState.SUCCESS,
- tasksState={
- task_0.task_id: TaskInstanceState.SUCCESS,
- task_1.task_id: TaskInstanceState.SKIPPED,
- task_2.task_id: TaskInstanceState.FAILED,
- },
- ),
- "debug": AirflowDebugRunFacet(packages=ANY),
- },
- ),
- job=Job(
- namespace=namespace(),
- name=dag_id,
- facets={
- "jobType": job_type_job.JobTypeJobFacet(
- processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
- )
- },
- ),
- producer=_PRODUCER,
- inputs=[],
- outputs=[],
- )
+ client.emit.assert_called_once_with(
+ RunEvent(
+ eventType=RunState.COMPLETE,
+ eventTime=event_time.isoformat(),
+ run=Run(
+ runId=random_uuid,
+ facets={
+ "airflowState": AirflowStateRunFacet(
+ dagRunState=DagRunState.SUCCESS,
+ tasksState={
+ task_0.task_id: TaskInstanceState.SUCCESS,
+ task_1.task_id: TaskInstanceState.SKIPPED,
+ task_2.task_id: TaskInstanceState.FAILED,
+ },
+ ),
+ "debug": AirflowDebugRunFacet(packages=ANY),
+ },
+ ),
+ job=Job(
+ namespace=namespace(),
+ name=dag_id,
+ facets={
+ "jobType": job_type_job.JobTypeJobFacet(
+ processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
+ )
+ },
+ ),
+ producer=_PRODUCER,
+ inputs=[],
+ outputs=[],
)
- in client.emit.mock_calls
)
mock_stats_incr.assert_not_called()
@@ -723,12 +741,12 @@ def test_emit_dag_complete_event(
@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
[email protected](DagRun, "get_task_instances")
[email protected](DagRun, "fetch_task_instances")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_dag_failed_event(
- mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks,
mock_debug_mode
+ mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_fetch_tis,
mock_debug_mode
):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
@@ -748,9 +766,9 @@ def test_emit_dag_failed_event(
start_date=event_time,
execution_date=event_time,
)
- dag_run._state = DagRunState.SUCCESS
+ dag_run._state = DagRunState.FAILED
dag_run.end_date = event_time
- mocked_get_tasks.return_value = [
+ mocked_fetch_tis.return_value = [
TaskInstance(task=task_0, run_id=run_id,
state=TaskInstanceState.SUCCESS),
TaskInstance(task=task_1, run_id=run_id,
state=TaskInstanceState.SKIPPED),
TaskInstance(task=task_2, run_id=run_id,
state=TaskInstanceState.FAILED),
@@ -758,47 +776,49 @@ def test_emit_dag_failed_event(
generate_static_uuid.return_value = random_uuid
adapter.dag_failed(
- dag_run=dag_run,
+ dag_id=dag_id,
+ run_id=run_id,
+ end_date=event_time,
+ logical_date=event_time,
+ dag_run_state=DagRunState.FAILED,
+ task_ids=["task_0", "task_1", "task_2.test"],
msg="error msg",
)
- assert (
- call(
- RunEvent(
- eventType=RunState.FAIL,
- eventTime=event_time.isoformat(),
- run=Run(
- runId=random_uuid,
- facets={
- "errorMessage": error_message_run.ErrorMessageRunFacet(
- message="error msg", programmingLanguage="python"
- ),
- "airflowState": AirflowStateRunFacet(
- dagRunState=DagRunState.SUCCESS,
- tasksState={
- task_0.task_id: TaskInstanceState.SUCCESS,
- task_1.task_id: TaskInstanceState.SKIPPED,
- task_2.task_id: TaskInstanceState.FAILED,
- },
- ),
- "debug": AirflowDebugRunFacet(packages=ANY),
- },
- ),
- job=Job(
- namespace=namespace(),
- name=dag_id,
- facets={
- "jobType": job_type_job.JobTypeJobFacet(
- processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
- )
- },
- ),
- producer=_PRODUCER,
- inputs=[],
- outputs=[],
- )
+ client.emit.assert_called_once_with(
+ RunEvent(
+ eventType=RunState.FAIL,
+ eventTime=event_time.isoformat(),
+ run=Run(
+ runId=random_uuid,
+ facets={
+ "errorMessage": error_message_run.ErrorMessageRunFacet(
+ message="error msg", programmingLanguage="python"
+ ),
+ "airflowState": AirflowStateRunFacet(
+ dagRunState=DagRunState.FAILED,
+ tasksState={
+ task_0.task_id: TaskInstanceState.SUCCESS,
+ task_1.task_id: TaskInstanceState.SKIPPED,
+ task_2.task_id: TaskInstanceState.FAILED,
+ },
+ ),
+ "debug": AirflowDebugRunFacet(packages=ANY),
+ },
+ ),
+ job=Job(
+ namespace=namespace(),
+ name=dag_id,
+ facets={
+ "jobType": job_type_job.JobTypeJobFacet(
+ processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
+ )
+ },
+ ),
+ producer=_PRODUCER,
+ inputs=[],
+ outputs=[],
)
- in client.emit.mock_calls
)
mock_stats_incr.assert_not_called()
diff --git a/tests/providers/openlineage/plugins/test_listener.py
b/tests/providers/openlineage/plugins/test_listener.py
index de2732cb36..22e3f5d44b 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -17,22 +17,27 @@
from __future__ import annotations
import datetime as dt
+import threading
import uuid
from contextlib import suppress
from typing import Callable
from unittest import mock
-from unittest.mock import ANY, patch
+from unittest.mock import ANY, MagicMock, patch
import pandas as pd
import pytest
+from openlineage.client import OpenLineageClient
+from openlineage.client.transport import ConsoleTransport
+from openlineage.client.transport.console import ConsoleConfig
from airflow.models import DAG, DagRun, TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.operators.python import PythonOperator
+from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet
from airflow.providers.openlineage.plugins.listener import OpenLineageListener
from airflow.providers.openlineage.utils.selective_enable import
disable_lineage, enable_lineage
-from airflow.utils.state import State
+from airflow.utils.state import DagRunState, State
from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS
from tests.test_utils.config import conf_vars
@@ -588,6 +593,28 @@ def
test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_exec
mock_executor.return_value.submit.assert_called_once()
+def test_listener_logs_failed_serialization():
+ listener = OpenLineageListener()
+ listener.log = MagicMock()
+ listener.adapter = OpenLineageAdapter(
+
client=OpenLineageClient(transport=ConsoleTransport(config=ConsoleConfig()))
+ )
+ event_time = dt.datetime.now()
+
+ fut = listener.submit_callable(
+ listener.adapter.dag_failed,
+ dag_id="",
+ run_id="",
+ end_date=event_time,
+ execution_date=threading.Thread(),
+ dag_run_state=DagRunState.FAILED,
+ task_ids=["task_id"],
+ msg="",
+ )
+ assert fut.exception(10)
+ listener.log.warning.assert_called_once()
+
+
class TestOpenLineageSelectiveEnable:
def setup_method(self):
self.dag = DAG(