This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 9c518fe TimeSensor should respect DAG timezone (#9882)
9c518fe is described below
commit 9c518fe937f8dc5e35908be96bd075c4ff666755
Author: zikun <[email protected]>
AuthorDate: Tue Jul 21 00:19:08 2020 +0800
TimeSensor should respect DAG timezone (#9882)
---
UPDATING.md | 5 ++--
airflow/sensors/time_sensor.py | 2 +-
tests/sensors/test_time_sensor.py | 52 +++++++++++++++++++++++++++++++++++++++
3 files changed, 56 insertions(+), 3 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index b58eaf1..923db25 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -1475,11 +1475,12 @@ arguments, please change `store_serialized_dags` to
`read_dags_from_db`.
Similarly, if you were using `DagBag().store_serialized_dags` property, change
it to
`DagBag().read_dags_from_db`.
-### TimeSensor will consider default_timezone setting.
+### TimeSensor is now timezone aware
Previously `TimeSensor` always compared the `target_time` with the current
time in UTC.
-Now it will compare `target_time` with the current time in the timezone set by
`default_timezone` under the `core` section of the config.
+Now it will compare `target_time` with the current time in the timezone of the
DAG,
+defaulting to the `default_timezone` in the global config.
## Airflow 1.10.11
diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
index 69feaae..eb6846b 100644
--- a/airflow/sensors/time_sensor.py
+++ b/airflow/sensors/time_sensor.py
@@ -36,4 +36,4 @@ class TimeSensor(BaseSensorOperator):
def poke(self, context):
self.log.info('Checking if the time (%s) has come', self.target_time)
- return timezone.make_naive(timezone.utcnow()).time() > self.target_time
+ return timezone.make_naive(timezone.utcnow(),
self.dag.timezone).time() > self.target_time
diff --git a/tests/sensors/test_time_sensor.py
b/tests/sensors/test_time_sensor.py
new file mode 100644
index 0000000..827fa5e
--- /dev/null
+++ b/tests/sensors/test_time_sensor.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, time
+
+import pendulum
+from mock import patch
+from parameterized import parameterized
+
+from airflow.models.dag import DAG
+from airflow.sensors.time_sensor import TimeSensor
+from airflow.utils import timezone
+
+DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00
+DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1)
+DEFAULT_DATE_WITH_TZ = datetime(
+ 2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE)
+)
+
+
+@patch(
+ "airflow.sensors.time_sensor.timezone.utcnow",
+ return_value=timezone.datetime(2020, 1, 1, 23,
0).replace(tzinfo=timezone.utc),
+)
+class TestTimeSensor:
+ @parameterized.expand(
+ [
+ ("UTC", DEFAULT_DATE_WO_TZ, True),
+ ("UTC", DEFAULT_DATE_WITH_TZ, False),
+ (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
+ ]
+ )
+ def test_timezone(self, mock_utcnow, default_timezone, start_date,
expected):
+ with patch("airflow.settings.TIMEZONE",
pendulum.timezone(default_timezone)):
+ dag = DAG("test", default_args={"start_date": start_date})
+ op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
+ assert op.poke(None) == expected