ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r651648816



##########
File path: airflow/timetables/base.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Iterator, NamedTuple, Optional
+
+from pendulum import DateTime
+
+from airflow.typing_compat import Protocol
+
+
+class DataInterval(NamedTuple):
+    """A data interval for a DagRun to operate over.
+
+    The represented interval is ``[start, end)``.
+    """
+
+    start: DateTime
+    end: DateTime
+
+
+class TimeRestriction(NamedTuple):
+    """Restriction on when a DAG can be scheduled for a run.
+
+    Specifically, the run must not be earlier than ``earliest``, nor later than
+    ``latest``. If ``catchup`` is *False*, the run must also not be earlier 
than
+    the current time, i.e. "missed" schedules are not backfilled.
+
+    These values are generally set on the DAG or task's ``start_date``,
+    ``end_date``, and ``catchup`` arguments.
+
+    Both ``earliest`` and ``latest`` are inclusive; a DAG run can happen 
exactly
+    at either point of time.
+    """
+
+    earliest: Optional[DateTime]
+    latest: Optional[DateTime]
+    catchup: bool
+
+
+class DagRunInfo(NamedTuple):
+    """Information to schedule a DagRun.
+
+    Instances of this will be returned by TimeTables when they are asked to
+    schedule a DagRun creation.
+    """
+
+    run_after: DateTime
+    """The earliest time this DagRun is created and its tasks scheduled."""
+
+    data_interval: DataInterval
+    """The data interval this DagRun to operate over, if applicable."""
+
+    @classmethod
+    def exact(cls, at: DateTime) -> "DagRunInfo":
+        """Represent a run on an exact time."""
+        return cls(run_after=at, data_interval=DataInterval(at, at))
+
+    @classmethod
+    def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
+        """Represent a run on a continuous schedule.
+
+        In such a schedule, each data interval starts right after the previous
+        one ends, and each run is scheduled right after the interval ends. This
+        applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
+        """
+        return cls(run_after=end, data_interval=DataInterval(start, end))
+
+
+class TimeTable(Protocol):

Review comment:
       Since this has an implementation (`iter_between`) should this just be a 
normal class?
   
   ```suggestion
   class TimeTable:
   ```

##########
File path: airflow/timetables/schedules.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import typing
+
+from cached_property import cached_property
+from croniter import CroniterBadCronError, CroniterBadDateError, croniter
+from dateutil.relativedelta import relativedelta
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.exceptions import AirflowTimeTableInvalid
+from airflow.typing_compat import Protocol
+from airflow.utils.dates import cron_presets
+from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
+
+Delta = typing.Union[datetime.timedelta, relativedelta]
+
+
+class Schedule(Protocol):
+    """Base protocol for schedules."""
+
+    def skip_to_latest(self, earliest: typing.Optional[DateTime]) -> DateTime:
+        """Bound the earliest time a run can be scheduled.
+
+        This is called when ``catchup=False``. See docstring of subclasses for
+        exact skipping behaviour of a schedule.
+        """
+        raise NotImplementedError()
+
+    def validate(self) -> None:
+        """Validate the time table is correctly specified.

Review comment:
       ```suggestion
           """Validate the timetable is correctly specified.
   ```

##########
File path: airflow/timetables/interval.py
##########
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
+from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule, 
Schedule
+
+
+class _DataIntervalTimeTable(TimeTable):
+    """Basis for time table implementations that schedule data intervals.
+
+    This kind of time tables create periodic data intervals from an underlying

Review comment:
       ```suggestion
       This kind of time table create periodic data intervals from an underlying
   ```

##########
File path: tests/test_utils/timetables.py
##########
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import settings
+from airflow.timetables.interval import CronDataIntervalTimeTable, 
DeltaDataIntervalTimeTable
+
+
+def cron_time_table(expr: str) -> CronDataIntervalTimeTable:
+    return CronDataIntervalTimeTable(expr, settings.TIMEZONE)
+
+
+def delta_time_table(delta) -> DeltaDataIntervalTimeTable:

Review comment:
       ```suggestion
   def delta_timetable(delta) -> DeltaDataIntervalTimeTable:
   ```

##########
File path: airflow/models/dag.py
##########
@@ -630,30 +567,37 @@ def get_run_dates(self, start_date, end_date=None):
         :type start_date: datetime
         :param end_date: the end date of the interval, defaults to 
timezone.utcnow()
         :type end_date: datetime
+        :param align: whether the first run should be delayed to "align" with
+            the schedule, or can happen immediately at start_date (default: 
True
+            for top-level dags, False for subdags)
+        :type align: bool

Review comment:
       This is _forced_ to false fo subdags -- do we need to set the default to 
be `align = None` maybe?

##########
File path: airflow/timetables/simple.py
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from pendulum import DateTime
+
+from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
+
+
+class NullTimeTable(TimeTable):
+    """Time table that never schedules anything.

Review comment:
       ```suggestion
       """Timetable that never schedules anything.
   ```

##########
File path: airflow/models/dag.py
##########
@@ -532,96 +503,62 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short 
circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = 
self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        next_info = self.time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            self._time_restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, 
self.following_schedule(next_execution_date))
+        return (next_info.data_interval.start, next_info.run_after)
 
     def next_dagrun_after_date(self, date_last_automated_dagrun: 
Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given 
``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max 
active run or any other
-        "concurrency" type limits, it only performs calculations based on the 
various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last 
scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = 
True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, 
timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if 
t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", 
next_run_date)
+        warnings.warn(
+            "`DAG.next_dagrun_after_date()` is deprecated. Please use 
`DAG.next_dagrun_info()` instead.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        return self.next_dagrun_info(date_last_automated_dagrun)[0]
+
+    @cached_property
+    def _time_restriction(self) -> TimeRestriction:
+        start_dates = [t.start_date for t in self.tasks if t.start_date]
+        if self.start_date is not None:
+            start_dates.append(self.start_date)
+        if start_dates:
+            earliest = timezone.coerce_datetime(min(start_dates))
         else:
-            next_run_date = self.following_schedule(date_last_automated_dagrun)
-
-        if date_last_automated_dagrun and next_run_date:
-            while next_run_date <= date_last_automated_dagrun:
-                next_run_date = self.following_schedule(next_run_date)
-
-        # don't ever schedule prior to the dag's start_date
-        if self.start_date:
-            next_run_date = self.start_date if not next_run_date else 
max(next_run_date, self.start_date)
-            if next_run_date == self.start_date:
-                next_run_date = self.normalize_schedule(self.start_date)
-
-            self.log.debug("Dag start date: %s. Next run date: %s", 
self.start_date, next_run_date)
-
-        # Don't schedule a dag beyond its end_date (as specified by the dag 
param)
-        if next_run_date and self.end_date and next_run_date > self.end_date:
-            return None
-
-        # Don't schedule a dag beyond its end_date (as specified by the task 
params)
-        # Get the min task end date, which may come from the dag.default_args
-        task_end_dates = [t.end_date for t in self.tasks if t.end_date]
-        if task_end_dates and next_run_date:
-            min_task_end_date = min(task_end_dates)
-            if next_run_date > min_task_end_date:
-                return None
-
-        return next_run_date
-
-    def get_run_dates(self, start_date, end_date=None):
+            earliest = None
+        end_dates = [t.end_date for t in self.tasks if t.end_date]
+        if self.end_date is not None:
+            end_dates.append(self.end_date)
+        if end_dates:
+            latest = timezone.coerce_datetime(max(end_dates))
+        else:
+            latest = None
+        return TimeRestriction(earliest, latest, self.catchup)
+
+    @cached_property
+    def time_table(self) -> TimeTable:

Review comment:
       ```suggestion
       def timetable(self) -> TimeTable:
   ```
   
   Timetable is one word.
   
   (I may have missed a few instances of this in the PR)

##########
File path: airflow/timetables/simple.py
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from pendulum import DateTime
+
+from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
+
+
+class NullTimeTable(TimeTable):
+    """Time table that never schedules anything.
+
+    This corresponds to ``schedule_interval=None``.
+    """
+
+    def __eq__(self, other: Any) -> bool:
+        """As long as *other* is of the same type."""
+        if not isinstance(other, NullTimeTable):
+            return NotImplemented
+        return True
+
+    def validate(self) -> None:
+        pass
+
+    def next_dagrun_info(
+        self,
+        last_automated_dagrun: Optional[DateTime],
+        restriction: TimeRestriction,
+    ) -> Optional[DagRunInfo]:
+        return None
+
+
+class OnceTimeTable(TimeTable):
+    """Time table that schedules the execution once as soon as possible.

Review comment:
       ```suggestion
       """Timetable that schedules the execution one time only and to run  as 
soon as possible.
   ```

##########
File path: tests/test_utils/timetables.py
##########
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import settings
+from airflow.timetables.interval import CronDataIntervalTimeTable, 
DeltaDataIntervalTimeTable
+
+
+def cron_time_table(expr: str) -> CronDataIntervalTimeTable:

Review comment:
       ```suggestion
   def cron_timetable(expr: str) -> CronDataIntervalTimeTable:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to