ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r635955157
##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
"automated" DagRuns for this dag (scheduled or backfill, but not
manual)
"""
- if (
- self.schedule_interval == "@once" and date_last_automated_dagrun
- ) or self.schedule_interval is None:
- # Manual trigger, or already created the run for @once, can short
circuit
+ # XXX: The timezone.coerce_datetime calls in this function should not
+ # be necessary since the function annotation suggests it only accepts
+ # pendulum.DateTime, and someone is passing datetime.datetime into this
+ # function. We should fix whatever is doing that.
+ if self.is_subdag:
return (None, None)
- next_execution_date =
self.next_dagrun_after_date(date_last_automated_dagrun)
-
- if next_execution_date is None:
+ time_table: TimeTable = self.time_table
+ restriction = self._format_time_restriction()
+ if not self.catchup:
+ restriction = time_table.cancel_catchup(restriction)
Review comment:
`not` ... `cancel` is a bit hard for me to follow (akin to a double
negative.)
How about this as an alternative name?
```suggestion
if not self.catchup:
restriction = time_table.skip_to_latest(restriction)
```
An additional thought: this should probably be handled _inside_
`time_table.next_dagrun_info`, so how about we change the signature of that to:
```python
def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
between: TimeRestriction,
catchup: boolean,
) -> Optional[DagRunInfo]:
```
I think by passing in the catchup and letting the Timetable handle it that
also means we don't have to have `cancel_catchup` as part of the protocol.
##########
File path: airflow/models/dag.py
##########
@@ -44,6 +44,7 @@
cast,
)
+import cached_property
Review comment:
We've been using functools.cache_property where available (py 3.8+).
(We should probably put that in a typing_compat or similar helper module so we
can not have the try/except in every file that needs it)
##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
"automated" DagRuns for this dag (scheduled or backfill, but not
manual)
"""
- if (
- self.schedule_interval == "@once" and date_last_automated_dagrun
- ) or self.schedule_interval is None:
- # Manual trigger, or already created the run for @once, can short
circuit
+ # XXX: The timezone.coerce_datetime calls in this function should not
+ # be necessary since the function annotation suggests it only accepts
+ # pendulum.DateTime, and someone is passing datetime.datetime into this
+ # function. We should fix whatever is doing that.
+ if self.is_subdag:
return (None, None)
- next_execution_date =
self.next_dagrun_after_date(date_last_automated_dagrun)
-
- if next_execution_date is None:
+ time_table: TimeTable = self.time_table
Review comment:
Is the type needed here? The property already specifies the type 🤔
##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
"automated" DagRuns for this dag (scheduled or backfill, but not
manual)
"""
- if (
- self.schedule_interval == "@once" and date_last_automated_dagrun
- ) or self.schedule_interval is None:
- # Manual trigger, or already created the run for @once, can short
circuit
+ # XXX: The timezone.coerce_datetime calls in this function should not
+ # be necessary since the function annotation suggests it only accepts
+ # pendulum.DateTime, and someone is passing datetime.datetime into this
+ # function. We should fix whatever is doing that.
+ if self.is_subdag:
return (None, None)
- next_execution_date =
self.next_dagrun_after_date(date_last_automated_dagrun)
-
- if next_execution_date is None:
+ time_table: TimeTable = self.time_table
+ restriction = self._format_time_restriction()
+ if not self.catchup:
+ restriction = time_table.cancel_catchup(restriction)
+ next_info = time_table.next_dagrun_info(
+ timezone.coerce_datetime(date_last_automated_dagrun),
+ restriction,
+ )
+ if next_info is None:
return (None, None)
-
- if self.schedule_interval == "@once":
- # For "@once" it can be created "now"
- return (next_execution_date, next_execution_date)
-
- return (next_execution_date,
self.following_schedule(next_execution_date))
-
- def next_dagrun_after_date(self, date_last_automated_dagrun:
Optional[pendulum.DateTime]):
- """
- Get the next execution date after the given
``date_last_automated_dagrun``, according to
- schedule_interval, start_date, end_date etc. This doesn't check max
active run or any other
- "concurrency" type limits, it only performs calculations based on the
various date and interval fields
- of this dag and it's tasks.
-
- :param date_last_automated_dagrun: The execution_date of the last
scheduler or
- backfill triggered run for this dag
- :type date_last_automated_dagrun: pendulum.Pendulum
- """
- if not self.schedule_interval or self.is_subdag:
- return None
-
- # don't schedule @once again
- if self.schedule_interval == '@once' and date_last_automated_dagrun:
- return None
-
- # don't do scheduler catchup for dag's that don't have dag.catchup =
True
- if not (self.catchup or self.schedule_interval == '@once'):
- # The logic is that we move start_date up until
- # one period before, so that timezone.utcnow() is AFTER
- # the period end, and the job can be created...
- now = timezone.utcnow()
- next_start = self.following_schedule(now)
- last_start = self.previous_schedule(now)
- if next_start <= now or isinstance(self.schedule_interval,
timedelta):
- new_start = last_start
- else:
- new_start = self.previous_schedule(last_start)
-
- if self.start_date:
- if new_start >= self.start_date:
- self.start_date = new_start
- else:
- self.start_date = new_start
-
- next_run_date = None
- if not date_last_automated_dagrun:
- # First run
- task_start_dates = [t.start_date for t in self.tasks if
t.start_date]
- if task_start_dates:
- next_run_date = self.normalize_schedule(min(task_start_dates))
- self.log.debug("Next run date based on tasks %s",
next_run_date)
+ return (next_info.data_interval.start, next_info.run_after)
+
+ def _format_time_restriction(self) -> TimeRestriction:
Review comment:
`_format` to me implies a string return. How about
```suggestion
def _get_time_restriction(self) -> TimeRestriction:
```
##########
File path: airflow/timetables/base.py
##########
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import NamedTuple, Optional
+
+from pendulum import DateTime
+
+from airflow.typing_compat import Protocol
+
+
+class DataInterval(NamedTuple):
+ """A data interval for a DagRun to operate over.
+
+ The represented interval is ``[start, end)``.
+ """
+
+ start: DateTime
+ end: DateTime
+
+
+class TimeRestriction(NamedTuple):
+ """A period to restrict a datetime between two values.
+
+ This is used to bound the next DagRun schedule to a time period. If the
+ scheduled time is earlier than ``earliest``, it is set to ``earliest``. If
+ the time is later than ``latest``, the DagRun is not scheduled.
+
+ Both values are inclusive; a DagRun can happen exactly at either
+ ``earliest`` or ``latest``.
+ """
+
+ earliest: Optional[DateTime]
+ latest: Optional[DateTime]
+
+
+class DagRunInfo(NamedTuple):
+ """Information to schedule a DagRun.
+
+ Instances of this will be returned by TimeTables when they are asked to
+ schedule a DagRun creation.
+ """
+
+ run_after: DateTime
+ """The earliest time this DagRun is created and its tasks scheduled."""
+
+ data_interval: DataInterval
+ """The data interval this DagRun to operate over, if applicable."""
+
+ @classmethod
+ def exact(cls, at: DateTime) -> "DagRunInfo":
+ """Represent a run on an exact time."""
+ return cls(run_after=at, data_interval=DataInterval(at, at))
+
+ @classmethod
+ def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
+ """Represent a run on a continuous schedule.
+
+ In such a schedule, each data interval starts right after the previous
+ one ends, and each run is scheduled right after the interval ends. This
+ applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
+ """
+ return cls(run_after=end, data_interval=DataInterval(start, end))
+
+
+class TimeTable(Protocol):
+ """Protocol that all TimeTable classes are expected to implement."""
+
+ def cancel_catchup(self, between: TimeRestriction) -> TimeRestriction:
+ """Fix time restriction to not perform catchup."""
+ raise NotImplementedError()
+
+ def next_dagrun_info(
+ self,
+ last_automated_dagrun: Optional[DateTime],
+ between: TimeRestriction,
+ ) -> Optional[DagRunInfo]:
+ """Provide information to schedule the next DagRun.
+
+ :param last_automated_dagrun: The execution_date of the associated
DAG's
+ last scheduled or backfilled run (manual runs not considered).
+ :param later_than: The next DagRun must be scheduled later than this
+ time. This is generally the earliest of ``DAG.start_date`` and each
+ ``BaseOperator.start_date`` in the DAG. None means the next DagRun
+ can happen anytime.
+
+ :return: Information on when the next DagRun can be scheduled. None
+ means a DagRun will not happen. This does not mean no more runs
+ will be scheduled even again for this DAG; the time table can
+ return a DagRunInfo when asked later.
Review comment:
```suggestion
return a DagRunInfo object when asked at another time.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]