This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1951f9b1a72 Fix openlineage dag state event emit on timed out dag
(#56542)
1951f9b1a72 is described below
commit 1951f9b1a72569b51ac962f1af6dd1866d09e299
Author: antonlin1 <[email protected]>
AuthorDate: Fri Oct 10 15:53:20 2025 +0200
Fix openlineage dag state event emit on timed out dag (#56542)
* fix openlineage dag state event emit on dag with skipped tasks
* fix tests
* rid of else
---
.../airflow/providers/openlineage/utils/utils.py | 16 ++++--
.../tests/unit/openlineage/utils/test_utils.py | 64 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 6 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 34dea63925e..b2e483315fc 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -751,16 +751,20 @@ def get_airflow_state_run_facet(
dag_id: str, run_id: str, task_ids: list[str], dag_run_state: DagRunState
) -> dict[str, AirflowStateRunFacet]:
tis = DagRun.fetch_task_instances(dag_id=dag_id, run_id=run_id,
task_ids=task_ids)
+
+ def get_task_duration(ti):
+ if ti.duration is not None:
+ return ti.duration
+ if ti.end_date is not None and ti.start_date is not None:
+ return (ti.end_date - ti.start_date).total_seconds()
+ # Fallback to 0.0 for tasks with missing timestamps (e.g.,
skipped/terminated tasks)
+ return 0.0
+
return {
"airflowState": AirflowStateRunFacet(
dagRunState=dag_run_state,
tasksState={ti.task_id: ti.state for ti in tis},
- tasksDuration={
- ti.task_id: ti.duration
- if ti.duration is not None
- else (ti.end_date - ti.start_date).total_seconds()
- for ti in tis
- },
+ tasksDuration={ti.task_id: get_task_duration(ti) for ti in tis},
)
}
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 38f816e2f61..2d82b6b6ca5 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -44,6 +44,7 @@ from airflow.providers.openlineage.utils.utils import (
_truncate_string_to_byte_size,
get_airflow_dag_run_facet,
get_airflow_job_facet,
+ get_airflow_state_run_facet,
get_dag_documentation,
get_fully_qualified_class_name,
get_job_name,
@@ -57,6 +58,7 @@ from airflow.serialization.serialized_objects import
SerializedBaseOperator
from airflow.timetables.events import EventsTimetable
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils import timezone
+from airflow.utils.session import create_session
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
@@ -2054,3 +2056,65 @@ def
test_get_operator_provider_version_for_mapped_operator(mock_providers_manage
mapped_operator =
BashOperator.partial(task_id="test_task").expand(bash_command=["echo 1", "echo
2"])
result = get_operator_provider_version(mapped_operator)
assert result == "1.2.0"
+
+
+class TestGetAirflowStateRunFacet:
+ @pytest.mark.db_test
+ def test_task_with_timestamps_defined(self, dag_maker):
+ """Test task instance with defined start_date and end_date."""
+ with dag_maker(dag_id="test_dag"):
+ BaseOperator(task_id="test_task")
+
+ dag_run = dag_maker.create_dagrun()
+ ti = dag_run.get_task_instance(task_id="test_task")
+
+ # Set valid timestamps
+ start_time = pendulum.parse("2024-01-01T10:00:00Z")
+ end_time = pendulum.parse("2024-01-01T10:02:30Z") # 150 seconds
difference
+ ti.start_date = start_time
+ ti.end_date = end_time
+ ti.state = TaskInstanceState.SUCCESS
+ ti.duration = None
+
+ # Persist changes to database
+ with create_session() as session:
+ session.merge(ti)
+ session.commit()
+
+ result = get_airflow_state_run_facet(
+ dag_id="test_dag",
+ run_id=dag_run.run_id,
+ task_ids=["test_task"],
+ dag_run_state=DagRunState.SUCCESS,
+ )
+
+ assert result["airflowState"].tasksDuration["test_task"] == 150.0
+
+ @pytest.mark.db_test
+ def test_task_with_none_timestamps_fallback_to_zero(self, dag_maker):
+ """Test task with None timestamps falls back to 0.0."""
+ with dag_maker(dag_id="test_dag"):
+ BaseOperator(task_id="terminated_task")
+
+ dag_run = dag_maker.create_dagrun()
+ ti = dag_run.get_task_instance(task_id="terminated_task")
+
+ # Set None timestamps (signal-terminated case)
+ ti.start_date = None
+ ti.end_date = None
+ ti.state = TaskInstanceState.SKIPPED
+ ti.duration = None
+
+ # Persist changes to database
+ with create_session() as session:
+ session.merge(ti)
+ session.commit()
+
+ result = get_airflow_state_run_facet(
+ dag_id="test_dag",
+ run_id=dag_run.run_id,
+ task_ids=["terminated_task"],
+ dag_run_state=DagRunState.FAILED,
+ )
+
+ assert result["airflowState"].tasksDuration["terminated_task"] == 0.0