[AIRFLOW-1806] Use naive datetime for cron scheduling Converting to naive time is required in order to make sure to run at exact times for crons. E.g. if you specify to run at 8:00pm every day you do not want suddenly to run at 7:00pm due to DST.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dcac3e97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dcac3e97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dcac3e97 Branch: refs/heads/master Commit: dcac3e97a4e1b4429e4baf9d8ab2a7eb4139ad74 Parents: 2f16863 Author: Bolke de Bruin <[email protected]> Authored: Sat Nov 11 13:38:59 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Mon Nov 27 15:54:27 2017 +0100 ---------------------------------------------------------------------- airflow/models.py | 20 +++++++++-- airflow/utils/timezone.py | 70 +++++++++++++++++++++++++++++++++++++++ tests/utils/test_timezone.py | 19 +++++++++++ 3 files changed, 107 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcac3e97/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index f8a5f0f..33f3636 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -46,6 +46,8 @@ import textwrap import traceback import warnings import hashlib + +from datetime import datetime from urllib.parse import urlparse from sqlalchemy import ( @@ -2996,16 +2998,30 @@ class DAG(BaseDag, LoggingMixin): num=num, delta=self._schedule_interval) def following_schedule(self, dttm): + """ + Calculates the following schedule for this dag in local time + :param dttm: utc datetime + :return: utc datetime + """ if isinstance(self._schedule_interval, six.string_types): + dttm = timezone.make_naive(dttm, self.timezone) cron = croniter(self._schedule_interval, dttm) - return cron.get_next(datetime) + following = timezone.make_aware(cron.get_next(datetime), self.timezone) + return timezone.convert_to_utc(following) elif isinstance(self._schedule_interval, timedelta): return dttm + self._schedule_interval def previous_schedule(self, dttm): + """ + Calculates the previous schedule for this dag in local time + :param dttm: utc datetime + :return: utc datetime + """ if isinstance(self._schedule_interval, six.string_types): + dttm = timezone.make_naive(dttm, self.timezone) cron = croniter(self._schedule_interval, dttm) - return cron.get_prev(datetime) + prev = timezone.make_aware(cron.get_prev(datetime), self.timezone) + return timezone.convert_to_utc(prev) elif isinstance(self._schedule_interval, timedelta): return dttm - self._schedule_interval http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcac3e97/airflow/utils/timezone.py ---------------------------------------------------------------------- diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index b8fe89e..5ae7802 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import datetime as dt import pendulum from airflow.settings import TIMEZONE @@ -66,3 +67,72 @@ def convert_to_utc(value): value = pendulum.instance(value, TIMEZONE) return value.astimezone(utc) + + +def make_aware(value, timezone=None): + """ + Make a naive datetime.datetime in a given time zone aware. + + :param value: datetime + :param timezone: timezone + :return: localized datetime in settings.TIMEZONE or timezone + + """ + if timezone is None: + timezone = TIMEZONE + + # Check that we won't overwrite the timezone of an aware datetime. + if is_localized(value): + raise ValueError( + "make_aware expects a naive datetime, got %s" % value) + + if hasattr(timezone, 'localize'): + # This method is available for pytz time zones. + return timezone.localize(value) + elif hasattr(timezone, 'convert'): + # For pendulum + return timezone.convert(value) + else: + # This may be wrong around DST changes! + return value.replace(tzinfo=timezone) + + +def make_naive(value, timezone=None): + """ + Make an aware datetime.datetime naive in a given time zone. + + :param value: datetime + :param timezone: timezone + :return: naive datetime + """ + if timezone is None: + timezone = TIMEZONE + + # Emulate the behavior of astimezone() on Python < 3.6. + if is_naive(value): + raise ValueError("make_naive() cannot be applied to a naive datetime") + + o = value.astimezone(timezone) + + # cross library compatibility + naive = dt.datetime(o.year, + o.month, + o.day, + o.hour, + o.minute, + o.second, + o.microsecond) + + return naive + + +def datetime(*args, **kwargs): + """ + Wrapper around datetime.datetime that adds settings.TIMEZONE if tzinfo not specified + + :return: datetime.datetime + """ + if 'tzinfo' not in kwargs: + kwargs['tzinfo'] = TIMEZONE + + return dt.datetime(*args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dcac3e97/tests/utils/test_timezone.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py index 778c772..3d4cc7c 100644 --- a/tests/utils/test_timezone.py +++ b/tests/utils/test_timezone.py @@ -46,3 +46,22 @@ class TimezoneTest(unittest.TestCase): eat = datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT) utc = datetime.datetime(2011, 9, 1, 10, 20, 30, tzinfo=UTC) self.assertEquals(utc, timezone.convert_to_utc(eat)) + + def test_make_naive(self): + self.assertEqual( + timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30)) + self.assertEqual( + timezone.make_naive(datetime.datetime(2011, 9, 1, 17, 20, 30, tzinfo=ICT), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30)) + + with self.assertRaises(ValueError): + timezone.make_naive(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT) + + def test_make_aware(self): + self.assertEqual( + timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30), EAT), + datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT)) + with self.assertRaises(ValueError): + timezone.make_aware(datetime.datetime(2011, 9, 1, 13, 20, 30, tzinfo=EAT), EAT) +
