mik-laj commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r489342193
##########
File path: airflow/models/dag.py
##########
@@ -458,6 +458,99 @@ def previous_schedule(self, dttm):
elif self.normalized_schedule_interval is not None:
return timezone.convert_to_utc(dttm -
self.normalized_schedule_interval)
+ def next_dagrun_info(self, date_last_automated_dagrun :
Optional[pendulum.DateTime]):
+ """
+ Get information about the next DagRun of this dag after
``date_last_automated_dagrun`` -- the
+ execution date, and the earliest it could be scheduled
+
+ :param date_last_automated_dagrun: The max(execution_date) of existing
+ "automated" DagRuns for this dag (scheduled or backfill, but not
+ manual)
+ """
+ next_execution_date =
self.next_dagrun_after_date(date_last_automated_dagrun)
+
+ if next_execution_date is None or self.schedule_interval in (None,
'@once'):
+ return None
+
+ return {
+ 'execution_date': next_execution_date,
+ 'can_be_created_after':
self.following_schedule(next_execution_date)
+ }
+
+ def next_dagrun_after_date(self, date_last_automated_dagrun:
Optional[pendulum.DateTime]):
+ """
+ Get the next execution date after the given
``date_last_automated_dagrun``, acording to
+ schedule_interval, start_date, end_date etc. This doesn't check max
active run or any other
+ "concurrency" type limits, it only perofmrs calculations based on the
varios date and interval fields
+ of this dag and it's tasks.
+
+ :param date_last_automated_dagrun: The execution_date of the last
scheduler or
+ backfill triggered run for this dag
+ :type date_last_automated_dagrun: pendulum.Pendulum
+ """
+ if not self.schedule_interval:
+ return None
+
+ # don't schedule @once again
+ if self.schedule_interval == '@once' and date_last_automated_dagrun:
+ return None
+
+ # don't do scheduler catchup for dag's that don't have dag.catchup =
True
+ if not (self.catchup or self.schedule_interval == '@once'):
+ # The logic is that we move start_date up until
+ # one period before, so that timezone.utcnow() is AFTER
+ # the period end, and the job can be created...
+ now = timezone.utcnow()
+ next_start = self.following_schedule(now)
+ last_start = self.previous_schedule(now)
+ if next_start <= now or isinstance(self.schedule_interval,
timedelta):
+ new_start = last_start
+ else:
+ new_start = self.previous_schedule(last_start)
+
+ if self.start_date:
+ if new_start >= self.start_date:
+ self.start_date = new_start
+ else:
+ self.start_date = new_start
+
+ next_run_date = None
+ if not date_last_automated_dagrun:
+ # First run
+ task_start_dates = [t.start_date for t in self.tasks]
+ if task_start_dates:
+ next_run_date = self.normalize_schedule(min(task_start_dates))
+ self.log.debug("Next run date based on tasks %s",
next_run_date)
+ else:
+ next_run_date = self.following_schedule(date_last_automated_dagrun)
+
+ if date_last_automated_dagrun and next_run_date:
+ while next_run_date <= date_last_automated_dagrun:
+ next_run_date = self.following_schedule(next_run_date)
+
+ # don't ever schedule prior to the dag's start_date
+ if self.start_date:
+ next_run_date = self.start_date if not next_run_date else
max(next_run_date, self.start_date)
+ if next_run_date == self.start_date:
+ next_run_date = self.normalize_schedule(self.start_date)
+
+ self.log.debug(
+ "Dag start date: %s. Next run date: %s",
+ self.start_date, next_run_date
+ )
+
+ # Don't schedule a dag beyond its end_date (as specified by the dag
param)
+ if next_run_date and self.end_date and next_run_date > self.end_date:
+ return None
+
+ # Don't schedule a dag beyond its end_date (as specified by the task
params)
+ # Get the min task end date, which may come from the dag.default_args
+ task_end_dates = [t.end_date for t in self.tasks if t.end_date]
+ if task_end_dates and next_run_date:
+ min_task_end_date = min(task_end_dates)
+ if next_run_date > min_task_end_date:
+ return None
+
Review comment:
When does this method return a result?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]