uranusjr commented on a change in pull request #18897:
URL: https://github.com/apache/airflow/pull/18897#discussion_r732245384



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -839,30 +839,19 @@ def _create_dag_runs(self, dag_models: 
Collection[DagModel], session: Session) -
         existing_dagruns = (
             session.query(DagRun.dag_id, 
DagRun.execution_date).filter(existing_dagruns_filter).all()
         )
-        max_queued_dagruns = conf.getint('core', 'max_queued_runs_per_dag')
 
-        queued_runs_of_dags = defaultdict(
+        active_runs_of_dags = defaultdict(
             int,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dm.dag_id for dm in dag_models})),
-                DagRun.state == State.QUEUED,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+            DagRun.active_runs_of_dags(dag_ids=[dm.dag_id for dm in 
dag_models], session=session),

Review comment:
       ```suggestion
               DagRun.active_runs_of_dags(dag_ids=(dm.dag_id for dm in 
dag_models), session=session),
   ```
   
   Avoid one list creation

##########
File path: airflow/models/dagrun.py
##########
@@ -207,6 +207,20 @@ def refresh_from_db(self, session: Session = None):
         self.id = dr.id
         self.state = dr.state
 
+    @classmethod
+    @provide_session
+    def active_runs_of_dags(cls, dag_ids=None, only_running=False, 
session=None) -> Dict[str, int]:
+        """Get the number of active dag runs for each dag."""
+        query = session.query(cls.dag_id, func.count('*'))
+        if dag_ids:
+            query = query.filter(cls.dag_id.in_(list(set(dag_ids))))

Review comment:
       Worthwhile to move the comment on deduplicating here. Also, if `dag_ids` 
is an empty list, I feel we should filter against an empty list instead of not 
filtering.
   
   ```suggestion
           if dag_ids is not None:
               # 'set' called to avoid duplicate dag_ids, but converted back to 
'list'
               # because SQLAlchemy doesn't accept a set here.
               query = query.filter(cls.dag_id.in_(list(set(dag_ids))))
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -899,15 +904,10 @@ def _start_queued_dagruns(
         dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
 
         active_runs_of_dags = defaultdict(
-            lambda: 0,
-            session.query(DagRun.dag_id, func.count('*'))
-            .filter(  # We use `list` here because SQLA doesn't accept a set
-                # We use set to avoid duplicate dag_ids
-                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
-                DagRun.state == State.RUNNING,
-            )
-            .group_by(DagRun.dag_id)
-            .all(),
+            int,
+            DagRun.active_runs_of_dags(
+                list({dr.dag_id for dr in dag_runs}), only_running=True, 
session=session

Review comment:
       ```suggestion
                   (dr.dag_id for dr in dag_runs), only_running=True, 
session=session
   ```
   
   Same as above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to