This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 332406d  BugFix: ``TimeSensorAsync`` returns a naive datetime (#17875)
332406d is described below

commit 332406dae9f6b08de0d43576c4ed176eb49b8ed0
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Aug 30 19:03:06 2021 +0100

    BugFix: ``TimeSensorAsync`` returns a naive datetime (#17875)
    
    My fix in https://github.com/apache/airflow/pull/17748 was only partially 
correct and I missed one part. `TimeSensorAsync` passed a naive datetime which 
failed when passed to `DateTimeTrigger`. This PR fixes it and adds test to 
avoid regression.
    
    Error:
    
    ```
    [2021-08-27 23:31:11,508] {taskinstance.py:1657} ERROR - Task failed with 
exception
    Traceback (most recent call last):
      File "/opt/airflow/airflow/models/taskinstance.py", line 1296, in 
_run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/opt/airflow/airflow/models/taskinstance.py", line 1415, in 
_prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/opt/airflow/airflow/models/taskinstance.py", line 1471, in 
_execute_task
        result = execute_callable(context=context)
      File "/opt/airflow/airflow/sensors/time_sensor.py", line 60, in execute
        trigger=DateTimeTrigger(moment=self.target_datetime),
      File "/opt/airflow/airflow/triggers/temporal.py", line 40, in __init__
        raise ValueError("You cannot pass naive datetimes")
    ValueError: You cannot pass naive datetimes
    ```
    
    Example DAG:
    
    ```python
    from datetime import timedelta, time
    
    from airflow import DAG
    from airflow.sensors.time_sensor import TimeSensorAsync
    from airflow.utils import dates, timezone
    
    with DAG(
        dag_id='example_date_time_async_operator',
        schedule_interval='0 0 * * *',
        start_date=dates.days_ago(2),
        dagrun_timeout=timedelta(minutes=60),
        tags=['example', 'example2', 'async'],
    ) as dag:
    
        TimeSensorAsync(task_id="test-2", target_time=time(0, 38, 0))
    
    ```
---
 airflow/sensors/time_sensor.py    |  7 ++++---
 tests/sensors/test_time_sensor.py | 22 +++++++++++++++++++++-
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
index 72416be..67f0602 100644
--- a/airflow/sensors/time_sensor.py
+++ b/airflow/sensors/time_sensor.py
@@ -51,9 +51,10 @@ class TimeSensorAsync(BaseSensorOperator):
     def __init__(self, *, target_time, **kwargs):
         super().__init__(**kwargs)
         self.target_time = target_time
-        current_time = timezone.make_naive(timezone.utcnow(), 
self.dag.timezone)
-        todays_date = current_time.date()
-        self.target_datetime = datetime.datetime.combine(todays_date, 
self.target_time, current_time.tzinfo)
+
+        self.target_datetime = timezone.coerce_datetime(
+            datetime.datetime.combine(datetime.datetime.today(), 
self.target_time)
+        )
 
     def execute(self, context):
         self.defer(
diff --git a/tests/sensors/test_time_sensor.py 
b/tests/sensors/test_time_sensor.py
index 567297d..b3ce2e8 100644
--- a/tests/sensors/test_time_sensor.py
+++ b/tests/sensors/test_time_sensor.py
@@ -19,11 +19,15 @@
 from datetime import datetime, time
 from unittest.mock import patch
 
+import freezegun
 import pendulum
+import pytest
 from parameterized import parameterized
 
+from airflow.exceptions import TaskDeferred
 from airflow.models.dag import DAG
-from airflow.sensors.time_sensor import TimeSensor
+from airflow.sensors.time_sensor import TimeSensor, TimeSensorAsync
+from airflow.triggers.temporal import DateTimeTrigger
 from airflow.utils import timezone
 
 DEFAULT_TIMEZONE = "Asia/Singapore"  # UTC+08:00
@@ -48,3 +52,19 @@ class TestTimeSensor:
             dag = DAG("test", default_args={"start_date": start_date})
             op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
             assert op.poke(None) == expected
+
+
+class TestTimeSensorAsync:
+    @freezegun.freeze_time("2020-07-07 00:00:00")
+    def test_task_is_deferred(self):
+        with DAG("test_task_is_deferred", start_date=timezone.datetime(2020, 
1, 1, 23, 0)):
+            op = TimeSensorAsync(task_id="test", target_time=time(10, 0))
+        assert not timezone.is_naive(op.target_datetime)
+
+        with pytest.raises(TaskDeferred) as exc_info:
+            op.execute({})
+
+        assert isinstance(exc_info.value.trigger, DateTimeTrigger)
+        assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7, 
10)
+        assert exc_info.value.method_name == "execute_complete"
+        assert exc_info.value.kwargs is None

Reply via email to