This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 332406d BugFix: ``TimeSensorAsync`` returns a naive datetime (#17875)
332406d is described below
commit 332406dae9f6b08de0d43576c4ed176eb49b8ed0
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Aug 30 19:03:06 2021 +0100
BugFix: ``TimeSensorAsync`` returns a naive datetime (#17875)
My fix in https://github.com/apache/airflow/pull/17748 was only partially
correct and I missed one part. `TimeSensorAsync` passed a naive datetime which
failed when passed to `DateTimeTrigger`. This PR fixes it and adds test to
avoid regression.
Error:
```
[2021-08-27 23:31:11,508] {taskinstance.py:1657} ERROR - Task failed with
exception
Traceback (most recent call last):
File "/opt/airflow/airflow/models/taskinstance.py", line 1296, in
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/airflow/airflow/models/taskinstance.py", line 1415, in
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/airflow/airflow/models/taskinstance.py", line 1471, in
_execute_task
result = execute_callable(context=context)
File "/opt/airflow/airflow/sensors/time_sensor.py", line 60, in execute
trigger=DateTimeTrigger(moment=self.target_datetime),
File "/opt/airflow/airflow/triggers/temporal.py", line 40, in __init__
raise ValueError("You cannot pass naive datetimes")
ValueError: You cannot pass naive datetimes
```
Example DAG:
```python
from datetime import timedelta, time
from airflow import DAG
from airflow.sensors.time_sensor import TimeSensorAsync
from airflow.utils import dates, timezone
with DAG(
dag_id='example_date_time_async_operator',
schedule_interval='0 0 * * *',
start_date=dates.days_ago(2),
dagrun_timeout=timedelta(minutes=60),
tags=['example', 'example2', 'async'],
) as dag:
TimeSensorAsync(task_id="test-2", target_time=time(0, 38, 0))
```
---
airflow/sensors/time_sensor.py | 7 ++++---
tests/sensors/test_time_sensor.py | 22 +++++++++++++++++++++-
2 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
index 72416be..67f0602 100644
--- a/airflow/sensors/time_sensor.py
+++ b/airflow/sensors/time_sensor.py
@@ -51,9 +51,10 @@ class TimeSensorAsync(BaseSensorOperator):
def __init__(self, *, target_time, **kwargs):
super().__init__(**kwargs)
self.target_time = target_time
- current_time = timezone.make_naive(timezone.utcnow(),
self.dag.timezone)
- todays_date = current_time.date()
- self.target_datetime = datetime.datetime.combine(todays_date,
self.target_time, current_time.tzinfo)
+
+ self.target_datetime = timezone.coerce_datetime(
+ datetime.datetime.combine(datetime.datetime.today(),
self.target_time)
+ )
def execute(self, context):
self.defer(
diff --git a/tests/sensors/test_time_sensor.py
b/tests/sensors/test_time_sensor.py
index 567297d..b3ce2e8 100644
--- a/tests/sensors/test_time_sensor.py
+++ b/tests/sensors/test_time_sensor.py
@@ -19,11 +19,15 @@
from datetime import datetime, time
from unittest.mock import patch
+import freezegun
import pendulum
+import pytest
from parameterized import parameterized
+from airflow.exceptions import TaskDeferred
from airflow.models.dag import DAG
-from airflow.sensors.time_sensor import TimeSensor
+from airflow.sensors.time_sensor import TimeSensor, TimeSensorAsync
+from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00
@@ -48,3 +52,19 @@ class TestTimeSensor:
dag = DAG("test", default_args={"start_date": start_date})
op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
assert op.poke(None) == expected
+
+
+class TestTimeSensorAsync:
+ @freezegun.freeze_time("2020-07-07 00:00:00")
+ def test_task_is_deferred(self):
+ with DAG("test_task_is_deferred", start_date=timezone.datetime(2020,
1, 1, 23, 0)):
+ op = TimeSensorAsync(task_id="test", target_time=time(10, 0))
+ assert not timezone.is_naive(op.target_datetime)
+
+ with pytest.raises(TaskDeferred) as exc_info:
+ op.execute({})
+
+ assert isinstance(exc_info.value.trigger, DateTimeTrigger)
+ assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7,
10)
+ assert exc_info.value.method_name == "execute_complete"
+ assert exc_info.value.kwargs is None