ashb commented on a change in pull request #16401:
URL: https://github.com/apache/airflow/pull/16401#discussion_r659897924



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -984,89 +956,86 @@ def _create_dag_runs(self, dag_models: 
Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run 
already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already 
exists or if we
             # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = 
dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = 
dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], 
session: Session) -> None:
+    def _start_queued_dagruns(
+        self,
+        session: Session,
+    ) -> int:
         """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the 
dags.
+        Find DagRuns in queued state and decide moving them to running state
 
-        We batch the select queries to get info about all the dags at once
+        :param dag_run: The DagRun to schedule
         """
-        # Check max_active_runs, to see if we are _now_ at the limit for any of
-        # these dag? (we've just created a DagRun for them after all)
-        active_runs_of_dags = dict(
+        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
+
+        active_runs_of_dags = defaultdict(
+            lambda: 0,
             session.query(DagRun.dag_id, func.count('*'))
             .filter(
-                DagRun.dag_id.in_([o.dag_id for o in dag_models]),
+                DagRun.dag_id.in_([dr.dag_id for dr in dag_runs]),

Review comment:
       This will potentially put a lot of the same dag_id in the query multiple 
times which could produce an inefficient query.
   
   ```suggestion
                   DagRun.dag_id.in_(list(set(dr.dag_id for dr in dag_runs))),
   ```
   
   And probably needs a comment saying why you need `list(set())` (which is 
that _I think_ SQLA doesn't accept a set)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to