This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 23d5076635 openlineage: defensively check for provided datetimes in
listener (#33343)
23d5076635 is described below
commit 23d507663541ab49f02d7863d42f9baf458cc48f
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sun Aug 13 18:17:47 2023 +0200
openlineage: defensively check for provided datetimes in listener (#33343)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
airflow/providers/openlineage/plugins/adapter.py | 4 ++--
airflow/providers/openlineage/plugins/listener.py | 28 +++++++++++++++++------
2 files changed, 23 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index 0f19aa0f14..f470631b07 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -127,8 +127,8 @@ class OpenLineageAdapter(LoggingMixin):
parent_job_name: str | None,
parent_run_id: str | None,
code_location: str | None,
- nominal_start_time: str,
- nominal_end_time: str,
+ nominal_start_time: str | None,
+ nominal_end_time: str | None,
owners: list[str],
task: OperatorLineage | None,
run_facets: dict[str, BaseFacet] | None = None, # Custom run facets
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 99394863f5..d85a559f56 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import logging
from concurrent.futures import Executor, ThreadPoolExecutor
+from datetime import datetime
from typing import TYPE_CHECKING
from airflow.listeners import hookimpl
@@ -76,16 +77,22 @@ class OpenLineageListener:
task_metadata = self.extractor_manager.extract_metadata(dagrun,
task)
+ start_date = task_instance.start_date if task_instance.start_date
else datetime.now()
+ data_interval_start = (
+ dagrun.data_interval_start.isoformat() if
dagrun.data_interval_start else None
+ )
+ data_interval_end = dagrun.data_interval_end.isoformat() if
dagrun.data_interval_end else None
+
self.adapter.start_task(
run_id=task_uuid,
job_name=get_job_name(task),
job_description=dag.description,
- event_time=task_instance.start_date.isoformat(),
+ event_time=start_date.isoformat(),
parent_job_name=dag.dag_id,
parent_run_id=parent_run_id,
code_location=None,
- nominal_start_time=dagrun.data_interval_start.isoformat(),
- nominal_end_time=dagrun.data_interval_end.isoformat(),
+ nominal_start_time=data_interval_start,
+ nominal_end_time=data_interval_end,
owners=dag.owner.split(", "),
task=task_metadata,
run_facets={
@@ -113,10 +120,13 @@ class OpenLineageListener:
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)
+
+ end_date = task_instance.end_date if task_instance.end_date else
datetime.now()
+
self.adapter.complete_task(
run_id=task_uuid,
job_name=get_job_name(task),
- end_time=task_instance.end_date.isoformat(),
+ end_time=end_date.isoformat(),
task=task_metadata,
)
@@ -139,10 +149,12 @@ class OpenLineageListener:
dagrun, task, complete=True, task_instance=task_instance
)
+ end_date = task_instance.end_date if task_instance.end_date else
datetime.now()
+
self.adapter.fail_task(
run_id=task_uuid,
job_name=get_job_name(task),
- end_time=task_instance.end_date.isoformat(),
+ end_time=end_date.isoformat(),
task=task_metadata,
)
@@ -165,12 +177,14 @@ class OpenLineageListener:
if not self.executor:
self.log.error("Executor have not started before
`on_dag_run_running`")
return
+ data_interval_start = dag_run.data_interval_start.isoformat() if
dag_run.data_interval_start else None
+ data_interval_end = dag_run.data_interval_end.isoformat() if
dag_run.data_interval_end else None
self.executor.submit(
self.adapter.dag_started,
dag_run=dag_run,
msg=msg,
- nominal_start_time=dag_run.data_interval_start.isoformat(),
- nominal_end_time=dag_run.data_interval_end.isoformat(),
+ nominal_start_time=data_interval_start,
+ nominal_end_time=data_interval_end,
)
@hookimpl