This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 12e17d1726 openlineage: replace dt.now with
airflow.utils.timezone.utcnow (#40887)
12e17d1726 is described below
commit 12e17d172690b7620149d70e63577e13f5b9efe2
Author: Kacper Muda <[email protected]>
AuthorDate: Mon Jul 22 11:21:24 2024 +0200
openlineage: replace dt.now with airflow.utils.timezone.utcnow (#40887)
Signed-off-by: Kacper Muda <[email protected]>
---
airflow/providers/openlineage/plugins/listener.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 71005e37a7..8798c542c1 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -19,7 +19,6 @@ from __future__ import annotations
import logging
import os
from concurrent.futures import ProcessPoolExecutor
-from datetime import datetime
from typing import TYPE_CHECKING
import psutil
@@ -43,6 +42,7 @@ from airflow.providers.openlineage.utils.utils import (
)
from airflow.settings import configure_orm
from airflow.stats import Stats
+from airflow.utils import timezone
from airflow.utils.timeout import timeout
if TYPE_CHECKING:
@@ -145,7 +145,7 @@ class OpenLineageListener:
with Stats.timer(f"ol.extract.{event_type}.{operator_name}"):
task_metadata =
self.extractor_manager.extract_metadata(dagrun, task)
- start_date = task_instance.start_date if task_instance.start_date
else datetime.now()
+ start_date = task_instance.start_date if task_instance.start_date
else timezone.utcnow()
data_interval_start = (
dagrun.data_interval_start.isoformat() if
dagrun.data_interval_start else None
)
@@ -224,7 +224,7 @@ class OpenLineageListener:
dagrun, task, complete=True, task_instance=task_instance
)
- end_date = task_instance.end_date if task_instance.end_date else
datetime.now()
+ end_date = task_instance.end_date if task_instance.end_date else
timezone.utcnow()
redacted_event = self.adapter.complete_task(
run_id=task_uuid,
@@ -318,7 +318,7 @@ class OpenLineageListener:
dagrun, task, complete=True, task_instance=task_instance
)
- end_date = task_instance.end_date if task_instance.end_date else
datetime.now()
+ end_date = task_instance.end_date if task_instance.end_date else
timezone.utcnow()
redacted_event = self.adapter.fail_task(
run_id=task_uuid,