This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 23d5076635 openlineage: defensively check for provided datetimes in 
listener (#33343)
23d5076635 is described below

commit 23d507663541ab49f02d7863d42f9baf458cc48f
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sun Aug 13 18:17:47 2023 +0200

    openlineage: defensively check for provided datetimes in listener (#33343)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 airflow/providers/openlineage/plugins/adapter.py  |  4 ++--
 airflow/providers/openlineage/plugins/listener.py | 28 +++++++++++++++++------
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index 0f19aa0f14..f470631b07 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -127,8 +127,8 @@ class OpenLineageAdapter(LoggingMixin):
         parent_job_name: str | None,
         parent_run_id: str | None,
         code_location: str | None,
-        nominal_start_time: str,
-        nominal_end_time: str,
+        nominal_start_time: str | None,
+        nominal_end_time: str | None,
         owners: list[str],
         task: OperatorLineage | None,
         run_facets: dict[str, BaseFacet] | None = None,  # Custom run facets
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 99394863f5..d85a559f56 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import logging
 from concurrent.futures import Executor, ThreadPoolExecutor
+from datetime import datetime
 from typing import TYPE_CHECKING
 
 from airflow.listeners import hookimpl
@@ -76,16 +77,22 @@ class OpenLineageListener:
 
             task_metadata = self.extractor_manager.extract_metadata(dagrun, 
task)
 
+            start_date = task_instance.start_date if task_instance.start_date 
else datetime.now()
+            data_interval_start = (
+                dagrun.data_interval_start.isoformat() if 
dagrun.data_interval_start else None
+            )
+            data_interval_end = dagrun.data_interval_end.isoformat() if 
dagrun.data_interval_end else None
+
             self.adapter.start_task(
                 run_id=task_uuid,
                 job_name=get_job_name(task),
                 job_description=dag.description,
-                event_time=task_instance.start_date.isoformat(),
+                event_time=start_date.isoformat(),
                 parent_job_name=dag.dag_id,
                 parent_run_id=parent_run_id,
                 code_location=None,
-                nominal_start_time=dagrun.data_interval_start.isoformat(),
-                nominal_end_time=dagrun.data_interval_end.isoformat(),
+                nominal_start_time=data_interval_start,
+                nominal_end_time=data_interval_end,
                 owners=dag.owner.split(", "),
                 task=task_metadata,
                 run_facets={
@@ -113,10 +120,13 @@ class OpenLineageListener:
             task_metadata = self.extractor_manager.extract_metadata(
                 dagrun, task, complete=True, task_instance=task_instance
             )
+
+            end_date = task_instance.end_date if task_instance.end_date else 
datetime.now()
+
             self.adapter.complete_task(
                 run_id=task_uuid,
                 job_name=get_job_name(task),
-                end_time=task_instance.end_date.isoformat(),
+                end_time=end_date.isoformat(),
                 task=task_metadata,
             )
 
@@ -139,10 +149,12 @@ class OpenLineageListener:
                 dagrun, task, complete=True, task_instance=task_instance
             )
 
+            end_date = task_instance.end_date if task_instance.end_date else 
datetime.now()
+
             self.adapter.fail_task(
                 run_id=task_uuid,
                 job_name=get_job_name(task),
-                end_time=task_instance.end_date.isoformat(),
+                end_time=end_date.isoformat(),
                 task=task_metadata,
             )
 
@@ -165,12 +177,14 @@ class OpenLineageListener:
         if not self.executor:
             self.log.error("Executor have not started before 
`on_dag_run_running`")
             return
+        data_interval_start = dag_run.data_interval_start.isoformat() if 
dag_run.data_interval_start else None
+        data_interval_end = dag_run.data_interval_end.isoformat() if 
dag_run.data_interval_end else None
         self.executor.submit(
             self.adapter.dag_started,
             dag_run=dag_run,
             msg=msg,
-            nominal_start_time=dag_run.data_interval_start.isoformat(),
-            nominal_end_time=dag_run.data_interval_end.isoformat(),
+            nominal_start_time=data_interval_start,
+            nominal_end_time=data_interval_end,
         )
 
     @hookimpl

Reply via email to