uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644601486



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short 
circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = 
self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, 
self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: 
Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given 
``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max 
active run or any other
-        "concurrency" type limits, it only performs calculations based on the 
various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last 
scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = 
True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, 
timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if 
t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", 
next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:

Review comment:
       I turned this into a cached property instead since the variable is also 
needed in `DAG.get_run_dates()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to