This is an automated email from the ASF dual-hosted git repository. dimberman pushed a commit to branch fix-resource-backcompat in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bbcbb711d20d966fd62beb6f52118eabbe4a27e0 Author: zikun <[email protected]> AuthorDate: Tue Jul 21 00:19:08 2020 +0800 TimeSensor should respect DAG timezone (#9882) (cherry picked from commit 9c518fe937f8dc5e35908be96bd075c4ff666755) --- UPDATING.md | 6 ++--- airflow/sensors/time_sensor.py | 2 +- tests/sensors/test_time_sensor.py | 52 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/UPDATING.md b/UPDATING.md index faa75fd..e2285df 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -63,12 +63,12 @@ https://developers.google.com/style/inclusive-documentation --> ## Airflow 1.10.13 -### TimeSensor will consider default_timezone setting. +### TimeSensor is now timezone aware Previously `TimeSensor` always compared the `target_time` with the current time in UTC. -Now it will compare `target_time` with the current time in the timezone set by `default_timezone` under the `core` -section of the config. +Now it will compare `target_time` with the current time in the timezone of the DAG, +defaulting to the `default_timezone` in the global config. ### Removed Kerberos support for HDFS hook diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index d26c32d..5c41c2c 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -37,4 +37,4 @@ class TimeSensor(BaseSensorOperator): def poke(self, context): self.log.info('Checking if the time (%s) has come', self.target_time) - return timezone.make_naive(timezone.utcnow()).time() > self.target_time + return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py new file mode 100644 index 0000000..c08bdd1 --- /dev/null +++ b/tests/sensors/test_time_sensor.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime, time + +import pendulum +from parameterized import parameterized + +from airflow.models.dag import DAG +from airflow.sensors.time_sensor import TimeSensor +from airflow.utils import timezone +from tests.compat import patch + +DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00 +DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1) +DEFAULT_DATE_WITH_TZ = datetime( + 2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE) +) + + +@patch( + "airflow.sensors.time_sensor.timezone.utcnow", + return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc), +) +class TestTimeSensor: + @parameterized.expand( + [ + ("UTC", DEFAULT_DATE_WO_TZ, True), + ("UTC", DEFAULT_DATE_WITH_TZ, False), + (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False), + ] + ) + def test_timezone(self, mock_utcnow, default_timezone, start_date, expected): + with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)): + dag = DAG("test", default_args={"start_date": start_date}) + op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag) + assert op.poke(None) == expected
