XD-DENG commented on a change in pull request #10956:
URL: https://github.com/apache/airflow/pull/10956#discussion_r490526192



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1714,53 +1321,158 @@ def _run_scheduler_loop(self) -> None:
                 )
                 break
 
-    def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) 
-> bool:
-        if simple_dag_bag.serialized_dags:
+    def _do_scheduling(self, session) -> int:
+        """
+        This function is where the main scheduling decisions take places. It:
+
+        - Creates any necessary DAG runs by examining the 
next_dagrun_create_after column of DagModel
+
+        - Finds the "next n oldest" running DAG Runs to examine for scheduling 
(n=20 by default) and tries to

Review comment:
       Minor: May be worth mentioning this default value "20" can be adjusted 
via configuration item `scheduler.max_dagruns_per_query`. Not for _users_, but 
for _developers_ reading this part later.

##########
File path: airflow/models/dag.py
##########
@@ -458,6 +458,101 @@ def previous_schedule(self, dttm):
         elif self.normalized_schedule_interval is not None:
             return timezone.convert_to_utc(dttm - 
self.normalized_schedule_interval)
 
+    def next_dagrun_info(self, date_last_automated_dagrun : 
Optional[pendulum.DateTime]):
+        """
+        Get information about the next DagRun of this dag after 
``date_last_automated_dagrun`` -- the
+        execution date, and the earliest it could be scheduled
+
+        :param date_last_automated_dagrun: The max(execution_date) of existing
+            "automated" DagRuns for this dag (scheduled or backfill, but not
+            manual)
+        """
+        next_execution_date = 
self.next_dagrun_after_date(date_last_automated_dagrun)
+
+        if next_execution_date is None or self.schedule_interval in (None, 
'@once'):
+            return None
+
+        return {
+            'execution_date': next_execution_date,
+            'can_be_created_after': 
self.following_schedule(next_execution_date)
+        }

Review comment:
       Not sure, but may be cleaner to return a tuple instead. Say line 473 
will be `return None, None`, and line 475 becomes `return next_execution_date, 
self.following_schedule(next_execution_date)`
   
   In such a way, calling also becomes simpler
   
   ```python
   orm_dag.next_dagrun, orm_dag.next_dagrun_create_after = 
dag.next_dagrun_info(xxx)
   ```
   
   Of course quite subjective opinion, so feel free to ignore

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1464,23 +1070,21 @@ def _execute_task_instances(
         2. Change the state for the TIs above atomically.
         3. Enqueue the TIs in the executor.
 
-        :param simple_dag_bag: TaskInstances associated with DAGs in the
-            simple_dag_bag will be fetched from the DB and executed
-        :type simple_dag_bag: airflow.utils.dag_processing.SimpleDagBag
+        HA note: This function is a "critical section" meaning that only a 
single executor process can execute
+        this function at the same time. This is achived by doing ``SELECT ... 
from pool FOR UPDATE``. For DBs

Review comment:
       typo "achived" -> "achieved"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to