[ 
https://issues.apache.org/jira/browse/AIRFLOW-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674986#comment-16674986
 ] 

ASF GitHub Bot commented on AIRFLOW-3277:
-----------------------------------------

ashb closed pull request #4117: [AIRFLOW-3277] Correctly observe DST 
transitions for cron
URL: https://github.com/apache/incubator-airflow/pull/4117
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models.py b/airflow/models.py
index 82b87c6d53..d0fb993da8 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -48,6 +48,7 @@
 import logging
 import numbers
 import os
+import pendulum
 import pickle
 import re
 import signal
@@ -3445,33 +3446,76 @@ def date_range(self, start_date, num=None, 
end_date=timezone.utcnow()):
             start_date=start_date, end_date=end_date,
             num=num, delta=self._schedule_interval)
 
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self._schedule_interval, now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
     def following_schedule(self, dttm):
         """
-        Calculates the following schedule for this dag in local time
+        Calculates the following schedule for this dag in UTC.
 
         :param dttm: utc datetime
         :return: utc datetime
         """
         if isinstance(self._schedule_interval, six.string_types):
-            dttm = timezone.make_naive(dttm, self.timezone)
-            cron = croniter(self._schedule_interval, dttm)
-            following = timezone.make_aware(cron.get_next(datetime), 
self.timezone)
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self._schedule_interval, naive)
+
+            # We assume that DST transitions happen on the minute/hour
+            if not self.is_fixed_time_schedule():
+                # relative offset (eg. every 5 minutes)
+                delta = cron.get_next(datetime) - naive
+                following = 
dttm.in_timezone(self.timezone).add_timedelta(delta)
+            else:
+                # absolute (e.g. 3 AM)
+                naive = cron.get_next(datetime)
+                tz = pendulum.timezone(self.timezone.name)
+                following = timezone.make_aware(naive, tz)
             return timezone.convert_to_utc(following)
         elif self._schedule_interval is not None:
             return dttm + self._schedule_interval
 
     def previous_schedule(self, dttm):
         """
-        Calculates the previous schedule for this dag in local time
+        Calculates the previous schedule for this dag in UTC
 
         :param dttm: utc datetime
         :return: utc datetime
         """
         if isinstance(self._schedule_interval, six.string_types):
-            dttm = timezone.make_naive(dttm, self.timezone)
-            cron = croniter(self._schedule_interval, dttm)
-            prev = timezone.make_aware(cron.get_prev(datetime), self.timezone)
-            return timezone.convert_to_utc(prev)
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self._schedule_interval, naive)
+
+            # We assume that DST transitions happen on the minute/hour
+            if not self.is_fixed_time_schedule():
+                # relative offset (eg. every 5 minutes)
+                delta = naive - cron.get_prev(datetime)
+                previous = 
dttm.in_timezone(self.timezone).subtract_timedelta(delta)
+            else:
+                # absolute (e.g. 3 AM)
+                naive = cron.get_prev(datetime)
+                tz = pendulum.timezone(self.timezone.name)
+                previous = timezone.make_aware(naive, tz)
+            return timezone.convert_to_utc(previous)
         elif self._schedule_interval is not None:
             return dttm - self._schedule_interval
 
diff --git a/tests/models.py b/tests/models.py
index d4cb738386..c14b42c86e 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -580,6 +580,96 @@ def test_cycle(self):
         with self.assertRaises(AirflowDagCycleException):
             dag.test_cycle()
 
+    def test_following_previous_schedule(self):
+        """
+        Make sure DST transitions are properly observed
+        """
+        local_tz = pendulum.timezone('Europe/Zurich')
+        start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55),
+                                 dst_rule=pendulum.PRE_TRANSITION)
+        self.assertEqual(start.isoformat(), "2018-10-28T02:55:00+02:00",
+                         "Pre-condition: start date is in DST")
+
+        utc = timezone.convert_to_utc(start)
+
+        dag = DAG('tz_dag', start_date=start, schedule_interval='*/5 * * * *')
+        _next = dag.following_schedule(utc)
+        next_local = local_tz.convert(_next)
+
+        self.assertEqual(_next.isoformat(), "2018-10-28T01:00:00+00:00")
+        self.assertEqual(next_local.isoformat(), "2018-10-28T02:00:00+01:00")
+
+        prev = dag.previous_schedule(utc)
+        prev_local = local_tz.convert(prev)
+
+        self.assertEqual(prev_local.isoformat(), "2018-10-28T02:50:00+02:00")
+
+        prev = dag.previous_schedule(_next)
+        prev_local = local_tz.convert(prev)
+
+        self.assertEqual(prev_local.isoformat(), "2018-10-28T02:55:00+02:00")
+        self.assertEqual(prev, utc)
+
+    def test_following_previous_schedule_daily_dag_CEST_to_CET(self):
+        """
+        Make sure DST transitions are properly observed
+        """
+        local_tz = pendulum.timezone('Europe/Zurich')
+        start = local_tz.convert(datetime.datetime(2018, 10, 27, 3),
+                                 dst_rule=pendulum.PRE_TRANSITION)
+
+        utc = timezone.convert_to_utc(start)
+
+        dag = DAG('tz_dag', start_date=start, schedule_interval='0 3 * * *')
+
+        prev = dag.previous_schedule(utc)
+        prev_local = local_tz.convert(prev)
+
+        self.assertEqual(prev_local.isoformat(), "2018-10-26T03:00:00+02:00")
+        self.assertEqual(prev.isoformat(), "2018-10-26T01:00:00+00:00")
+
+        _next = dag.following_schedule(utc)
+        next_local = local_tz.convert(_next)
+
+        self.assertEqual(next_local.isoformat(), "2018-10-28T03:00:00+01:00")
+        self.assertEqual(_next.isoformat(), "2018-10-28T02:00:00+00:00")
+
+        prev = dag.previous_schedule(_next)
+        prev_local = local_tz.convert(prev)
+
+        self.assertEqual(prev_local.isoformat(), "2018-10-27T03:00:00+02:00")
+        self.assertEqual(prev.isoformat(), "2018-10-27T01:00:00+00:00")
+
+    def test_following_previous_schedule_daily_dag_CET_to_CEST(self):
+        """
+        Make sure DST transitions are properly observed
+        """
+        local_tz = pendulum.timezone('Europe/Zurich')
+        start = local_tz.convert(datetime.datetime(2018, 3, 25, 2),
+                                 dst_rule=pendulum.PRE_TRANSITION)
+
+        utc = timezone.convert_to_utc(start)
+
+        dag = DAG('tz_dag', start_date=start, schedule_interval='0 3 * * *')
+
+        prev = dag.previous_schedule(utc)
+        prev_local = local_tz.convert(prev)
+
+        self.assertEqual(prev_local.isoformat(), "2018-03-24T03:00:00+01:00")
+        self.assertEqual(prev.isoformat(), "2018-03-24T02:00:00+00:00")
+
+        _next = dag.following_schedule(utc)
+        next_local = local_tz.convert(_next)
+
+        self.assertEqual(next_local.isoformat(), "2018-03-25T03:00:00+02:00")
+        self.assertEqual(_next.isoformat(), "2018-03-25T01:00:00+00:00")
+
+        prev = dag.previous_schedule(_next)
+        prev_local = local_tz.convert(prev)
+
+        self.assertEqual(prev_local.isoformat(), "2018-03-24T03:00:00+01:00")
+        self.assertEqual(prev.isoformat(), "2018-03-24T02:00:00+00:00")
+
     @patch('airflow.models.timezone.utcnow')
     def test_sync_to_db(self, mock_now):
         dag = DAG(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Invalid timezone transition handling for cron schedules
> -------------------------------------------------------
>
>                 Key: AIRFLOW-3277
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3277
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.10.0
>            Reporter: Bolke de Bruin
>            Priority: Blocker
>             Fix For: 1.10.1
>
>
> `following_schedule` converts to naive time by using the local time zone. In 
> case of a DST transition, say 3AM -> 2AM ("summer time to winter time") we 
> generate date times that could overlap with earlier schedules. Therefore a 
> DAG that should run every 5 minutes will not do so if it has already seen the 
> schedule.
> We should not convert to naive and keep UTC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to