This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3e8782a485d647886642cfeac0f10c6f294d1bce
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Sat Nov 6 06:48:15 2021 +0800

    Fix Serialization when``relativedelta`` is passed as ``schedule_interval``  
(#19418)
    
    Also add relativedelta to timetable test cases to ensure this does not
    regress.
    
    Fix #19416
    
    (cherry picked from commit b590cc8a976da9fa7fe8c5850bd16d3dc856c52c)
---
 airflow/timetables/interval.py              |  5 ++-
 tests/timetables/test_interval_timetable.py | 66 +++++++++++++++++++++++++++--
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index a095565..d669cb6 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -271,8 +271,9 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
         return {"delta": delta}
 
     def validate(self) -> None:
-        if self._delta.total_seconds() <= 0:
-            raise AirflowTimetableInvalid("schedule interval must be positive")
+        now = datetime.datetime.now()
+        if (now + self._delta) <= now:
+            raise AirflowTimetableInvalid(f"schedule interval must be 
positive, not {self._delta!r}")
 
     def _get_next(self, current: DateTime) -> DateTime:
         return convert_to_utc(current + self._delta)
diff --git a/tests/timetables/test_interval_timetable.py 
b/tests/timetables/test_interval_timetable.py
index 53f5aeb..842cc1f2 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -18,10 +18,12 @@
 import datetime
 from typing import Optional
 
+import dateutil.relativedelta
 import freezegun
 import pendulum
 import pytest
 
+from airflow.exceptions import AirflowTimetableInvalid
 from airflow.settings import TIMEZONE
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
 from airflow.timetables.interval import CronDataIntervalTimetable, 
DeltaDataIntervalTimetable
@@ -35,12 +37,17 @@ PREV_DATA_INTERVAL = 
DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_
 CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
 
 HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
-HOURLY_DELTA_TIMETABLE = 
DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
+HOURLY_TIMEDELTA_TIMETABLE = 
DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
+HOURLY_RELATIVEDELTA_TIMETABLE = 
DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1))
 
 
 @pytest.mark.parametrize(
     "timetable",
-    [pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), 
pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")],
+    [
+        pytest.param(HOURLY_CRON_TIMETABLE, id="cron"),
+        pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+        pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+    ],
 )
 @pytest.mark.parametrize(
     "last_automated_data_interval",
@@ -62,7 +69,11 @@ def test_no_catchup_next_info_starts_at_current_time(
 
 @pytest.mark.parametrize(
     "timetable",
-    [pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), 
pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")],
+    [
+        pytest.param(HOURLY_CRON_TIMETABLE, id="cron"),
+        pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+        pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+    ],
 )
 def test_catchup_next_info_starts_at_previous_interval_end(timetable: 
Timetable) -> None:
     """If ``catchup=True``, the next interval starts at the previous's end."""
@@ -72,3 +83,52 @@ def 
test_catchup_next_info_starts_at_previous_interval_end(timetable: Timetable)
     )
     expected_end = PREV_DATA_INTERVAL_END + datetime.timedelta(hours=1)
     assert next_info == DagRunInfo.interval(start=PREV_DATA_INTERVAL_END, 
end=expected_end)
+
+
[email protected](
+    "timetable",
+    [
+        pytest.param(HOURLY_CRON_TIMETABLE, id="cron"),
+        pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+        pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+    ],
+)
+def test_validate_success(timetable: Timetable) -> None:
+    timetable.validate()
+
+
[email protected](
+    "timetable, error_message",
+    [
+        pytest.param(
+            CronDataIntervalTimetable("0 0 1 13 0", TIMEZONE),
+            "[0 0 1 13 0] is not acceptable, out of range",
+            id="invalid-cron",
+        ),
+        pytest.param(
+            DeltaDataIntervalTimetable(datetime.timedelta()),
+            "schedule interval must be positive, not datetime.timedelta(0)",
+            id="zero-timedelta",
+        ),
+        pytest.param(
+            DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta()),
+            "schedule interval must be positive, not relativedelta()",
+            id="zero-relativedelta",
+        ),
+        pytest.param(
+            DeltaDataIntervalTimetable(datetime.timedelta(days=-1)),
+            # Dynamically formatted since different Python versions display 
timedelta differently.
+            f"schedule interval must be positive, not 
{datetime.timedelta(days=-1)!r}",
+            id="negative-timedelta",
+        ),
+        pytest.param(
+            
DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(days=-1)),
+            "schedule interval must be positive, not relativedelta(days=-1)",
+            id="negative-relativedelta",
+        ),
+    ],
+)
+def test_validate_failure(timetable: Timetable, error_message: str) -> None:
+    with pytest.raises(AirflowTimetableInvalid) as ctx:
+        timetable.validate()
+    assert str(ctx.value) == error_message

Reply via email to