This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 36cb4f3c7da bug fix: DateTimeSensor can't render jinja template if use
native obj (#50744)
36cb4f3c7da is described below
commit 36cb4f3c7daa0e3ac7524d826d61d8f0b320be97
Author: Aaron Chen <[email protected]>
AuthorDate: Mon May 26 17:52:03 2025 -0700
bug fix: DateTimeSensor can't render jinja template if use native obj
(#50744)
* fix bug: can't render jinja template if use native obj
* re-order `from airflow.utils import timezone`
* make `_moment` as a property
* fix unit test
* remove the unnecessary pendulum object
---
.../providers/standard/sensors/date_time.py | 14 ++++++---
.../tests/unit/standard/sensors/test_date_time.py | 33 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 4 deletions(-)
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/date_time.py
b/providers/standard/src/airflow/providers/standard/sensors/date_time.py
index d04f524cbab..b1c5b5da297 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/date_time.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/date_time.py
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Any, NoReturn
from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
+from airflow.utils import timezone
try:
from airflow.triggers.base import StartTriggerArgs
@@ -41,8 +42,6 @@ except ImportError:
timeout: datetime.timedelta | None = None
-from airflow.utils import timezone
-
if TYPE_CHECKING:
try:
from airflow.sdk.definitions.context import Context
@@ -99,6 +98,13 @@ class DateTimeSensor(BaseSensorOperator):
self.log.info("Checking if the time (%s) has come", self.target_time)
return timezone.utcnow() > timezone.parse(self.target_time)
+ @property
+ def _moment(self) -> datetime.datetime:
+ if isinstance(self.target_time, datetime.datetime):
+ return self.target_time
+
+ return timezone.parse(self.target_time)
+
class DateTimeSensorAsync(DateTimeSensor):
"""
@@ -145,11 +151,11 @@ class DateTimeSensorAsync(DateTimeSensor):
self.defer(
method_name="execute_complete",
trigger=DateTimeTrigger(
- moment=timezone.parse(self.target_time),
+ moment=self._moment,
end_from_trigger=self.end_from_trigger,
)
if AIRFLOW_V_3_0_PLUS
- else DateTimeTrigger(moment=timezone.parse(self.target_time)),
+ else DateTimeTrigger(moment=self._moment),
)
def execute_complete(self, context: Context, event: Any = None) -> None:
diff --git a/providers/standard/tests/unit/standard/sensors/test_date_time.py
b/providers/standard/tests/unit/standard/sensors/test_date_time.py
index c51b5316206..188237a957b 100644
--- a/providers/standard/tests/unit/standard/sensors/test_date_time.py
+++ b/providers/standard/tests/unit/standard/sensors/test_date_time.py
@@ -19,8 +19,10 @@ from __future__ import annotations
from unittest.mock import patch
+import pendulum
import pytest
+from airflow import macros
from airflow.models.dag import DAG
from airflow.providers.standard.sensors.date_time import DateTimeSensor
from airflow.utils import timezone
@@ -90,3 +92,34 @@ class TestDateTimeSensor:
def test_poke(self, mock_utcnow, task_id, target_time, expected):
op = DateTimeSensor(task_id=task_id, target_time=target_time,
dag=self.dag)
assert op.poke(None) == expected
+
+ @pytest.mark.parametrize(
+ "native, target_time, expected_type",
+ [
+ (False, "2025-01-01T00:00:00+00:00", pendulum.DateTime),
+ (True, "{{ data_interval_end }}", pendulum.DateTime),
+ (False, pendulum.datetime(2025, 1, 1, tz="UTC"),
pendulum.DateTime),
+ ],
+ )
+ def test_moment(self, native, target_time, expected_type):
+ dag = DAG(
+ dag_id="moment_dag",
+ start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+ schedule=None,
+ render_template_as_native_obj=native,
+ )
+
+ sensor = DateTimeSensor(
+ task_id="moment",
+ target_time=target_time,
+ dag=dag,
+ )
+
+ ctx = {
+ "data_interval_end": pendulum.datetime(2025, 1, 1, tz="UTC"),
+ "macros": macros,
+ "dag": dag,
+ }
+ sensor.render_template_fields(ctx)
+
+ assert isinstance(sensor._moment, expected_type)