kaxil commented on a change in pull request #16401:
URL: https://github.com/apache/airflow/pull/16401#discussion_r660149354



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -980,89 +952,86 @@ def _create_dag_runs(self, dag_models: 
Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run 
already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already 
exists or if we
             # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = 
dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = 
dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], 
session: Session) -> None:
+    def _start_queued_dagruns(
+        self,
+        session: Session,
+    ) -> int:
         """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the 
dags.
+        Find DagRuns in queued state and decide moving them to running state
 
-        We batch the select queries to get info about all the dags at once
+        :param dag_run: The DagRun to schedule
         """
-        # Check max_active_runs, to see if we are _now_ at the limit for any of
-        # these dag? (we've just created a DagRun for them after all)
-        active_runs_of_dags = dict(
+        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
+
+        active_runs_of_dags = defaultdict(
+            lambda: 0,
             session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_([o.dag_id for o in dag_models]),
+            .filter(  # We use `list` here because SQLA doesn't accept a set
+                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),

Review comment:
       ```suggestion
                   DagRun.dag_id.in_(list(dr.dag_id for dr in dag_runs)),
   ```

##########
File path: 
airflow/migrations/versions/a84a5abfca95_update_dagrun_state_and_datetime.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""update-dagrun-state-and-datetime
+
+Revision ID: a84a5abfca95
+Revises: 30867afad44a
+Create Date: 2021-06-13 11:08:18.705168
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.dates import timezone
+from airflow.utils.state import State
+
+# revision identifiers, used by Alembic.
+revision = 'a84a5abfca95'
+down_revision = '30867afad44a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=timezone.utcnow(),
+            server_default=None,
+        )
+        batch_op.alter_column(
+            'state',
+            type_=sa.String(length=50),
+            existing_server_default=State.RUNNING,
+            server_default=State.QUEUED,
+        )
+
+
+def downgrade():
+    """Unapply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=None,
+            server_default=timezone.utcnow(),
+        )
+        batch_op.alter_column(
+            'state',
+            type_=sa.String(length=50),
+            existing_server_default=State.QUEUED,
+            server_default=State.RUNNING,

Review comment:
       same here

##########
File path: 
airflow/migrations/versions/a84a5abfca95_update_dagrun_state_and_datetime.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""update-dagrun-state-and-datetime
+
+Revision ID: a84a5abfca95
+Revises: 30867afad44a
+Create Date: 2021-06-13 11:08:18.705168
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.dates import timezone
+from airflow.utils.state import State
+
+# revision identifiers, used by Alembic.
+revision = 'a84a5abfca95'
+down_revision = '30867afad44a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=timezone.utcnow(),
+            server_default=None,
+        )
+        batch_op.alter_column(
+            'state',
+            type_=sa.String(length=50),
+            existing_server_default=State.RUNNING,
+            server_default=State.QUEUED,

Review comment:
       ```suggestion
   ```
   
   We don't want to set `server_defaults` instead we set Python-side defaults

##########
File path: 
airflow/migrations/versions/a84a5abfca95_update_dagrun_state_and_datetime.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""update-dagrun-state-and-datetime
+
+Revision ID: a84a5abfca95
+Revises: 30867afad44a
+Create Date: 2021-06-13 11:08:18.705168
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.dates import timezone
+from airflow.utils.state import State
+
+# revision identifiers, used by Alembic.
+revision = 'a84a5abfca95'
+down_revision = '30867afad44a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=timezone.utcnow(),
+            server_default=None,
+        )
+        batch_op.alter_column(
+            'state',
+            type_=sa.String(length=50),
+            existing_server_default=State.RUNNING,
+            server_default=State.QUEUED,
+        )
+
+
+def downgrade():
+    """Unapply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=None,
+            server_default=timezone.utcnow(),

Review comment:
       same here

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -215,7 +214,7 @@ def _change_state_for_tis_without_dagrun(
             .filter(models.TaskInstance.state.in_(old_states))
             .filter(
                 or_(
-                    models.DagRun.state != State.RUNNING,
+                    models.DagRun.state.notin_([State.RUNNING, State.QUEUED]),

Review comment:
       L200 needs updating to say "not in the running or queued state"

##########
File path: 
airflow/migrations/versions/a84a5abfca95_update_dagrun_state_and_datetime.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""update-dagrun-state-and-datetime
+
+Revision ID: a84a5abfca95
+Revises: 30867afad44a
+Create Date: 2021-06-13 11:08:18.705168
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.dates import timezone
+from airflow.utils.state import State
+
+# revision identifiers, used by Alembic.
+revision = 'a84a5abfca95'
+down_revision = '30867afad44a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=timezone.utcnow(),

Review comment:
       
![image](https://user-images.githubusercontent.com/8811558/123711038-b08a0b00-d867-11eb-9fd8-60565abfd999.png)
   
   
https://alembic.sqlalchemy.org/en/latest/ops.html#alembic.operations.Operations.add_column

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1591,89 +1567,85 @@ def _create_dag_runs(self, dag_models: 
Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run 
already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already 
exists or if we
             # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = 
dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = 
dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], 
session: Session) -> None:
+    def _start_dagrun(
+        self,
+        guard,
+        session: Session,
+    ) -> int:
         """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the 
dags.
-
-        We batch the select queries to get info about all the dags at once
+        Find DagRuns in queued state and decide moving them to running state
+        :param dag_run: The DagRun to schedule

Review comment:
       We don't have a `dag_run` param on this method though on this method
   
   

##########
File path: 
airflow/migrations/versions/a84a5abfca95_update_dagrun_state_and_datetime.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""update-dagrun-state-and-datetime
+
+Revision ID: a84a5abfca95
+Revises: 30867afad44a
+Create Date: 2021-06-13 11:08:18.705168
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.dates import timezone
+from airflow.utils.state import State
+
+# revision identifiers, used by Alembic.
+revision = 'a84a5abfca95'
+down_revision = '30867afad44a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=timezone.utcnow(),

Review comment:
       We don't have any `server_default`, we just had a `default`:
   
   https://docs.sqlalchemy.org/en/14/core/defaults.html#scalar-defaults
   
   
https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.Column.params.server_default
   
https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.Column.params.default

##########
File path: airflow/models/dag.py
##########
@@ -1423,7 +1423,7 @@ def clear(
         confirm_prompt=False,
         include_subdags=True,
         include_parentdag=True,
-        dag_run_state: str = State.RUNNING,
+        dag_run_state: str = State.QUEUED,

Review comment:
       Similarly, we should update L1552 too:
   
   
https://github.com/apache/airflow/blob/0ed4445635a70645e53fdf433356829b504e7d53/airflow/models/dag.py#L1552

##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | 
Description                                                                     
      |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``30867afad44a`` (head)        | ``e9304a3141f0`` |                 | Rename 
``concurrency`` column in ``dag`` table to`` max_active_tasks``                |
+| ``a84a5abfca95`` (head)        | ``30867afad44a`` |                 | Set 
default start_time for dagrun to null and set default state to QUEUED           
  |

Review comment:
       We might not need a migration at all!

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -980,89 +952,86 @@ def _create_dag_runs(self, dag_models: 
Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run 
already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already 
exists or if we
             # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,

Review comment:
       Can be a separate PR but do you think it is worth adding `queued_dttm` 
similar to TaskInstances:
   
   
https://github.com/apache/airflow/blob/57dcac22137bc958c1ed9f12fa54484e13411a6f/airflow/models/taskinstance.py#L307
   
   This can help in future for benchmarking difference between Queued and Start 
time for a DagRun

##########
File path: 
airflow/migrations/versions/a84a5abfca95_update_dagrun_state_and_datetime.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""update-dagrun-state-and-datetime
+
+Revision ID: a84a5abfca95
+Revises: 30867afad44a
+Create Date: 2021-06-13 11:08:18.705168
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.dates import timezone
+from airflow.utils.state import State
+
+# revision identifiers, used by Alembic.
+revision = 'a84a5abfca95'
+down_revision = '30867afad44a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=timezone.utcnow(),
+            server_default=None,

Review comment:
       ```suggestion
   ```

##########
File path: 
airflow/migrations/versions/a84a5abfca95_update_dagrun_state_and_datetime.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""update-dagrun-state-and-datetime
+
+Revision ID: a84a5abfca95
+Revises: 30867afad44a
+Create Date: 2021-06-13 11:08:18.705168
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.dates import timezone
+from airflow.utils.state import State
+
+# revision identifiers, used by Alembic.
+revision = 'a84a5abfca95'
+down_revision = '30867afad44a'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply Set default start_time for dagrun to None and set default state 
to QUEUED"""
+    with op.batch_alter_table('dag_run') as batch_op:
+        batch_op.alter_column(
+            'start_date',
+            type_=sa.TIMESTAMP(timezone=True),
+            existing_server_default=timezone.utcnow(),
+            server_default=None,
+        )
+        batch_op.alter_column(
+            'state',
+            type_=sa.String(length=50),
+            existing_server_default=State.RUNNING,
+            server_default=State.QUEUED,

Review comment:
       tbh I don't think we need a migration at all !!

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -980,89 +952,86 @@ def _create_dag_runs(self, dag_models: 
Iterable[DagModel], session: Session) ->
             # are not updated.
             # We opted to check DagRun existence instead
             # of catching an Integrity error and rolling back the session i.e
-            # we need to run self._update_dag_next_dagruns if the Dag Run 
already exists or if we
+            # we need to set dag.next_dagrun_info if the Dag Run already 
exists or if we
             # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
             # instead of falling in a loop of Integrity Error.
-            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
-                run = dag.create_dagrun(
+            if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+                dag.create_dagrun(
                     run_type=DagRunType.SCHEDULED,
                     execution_date=dag_model.next_dagrun,
-                    start_date=timezone.utcnow(),
-                    state=State.RUNNING,
+                    state=State.QUEUED,
                     external_trigger=False,
                     session=session,
                     dag_hash=dag_hash,
                     creating_job_id=self.id,
                 )
-
-                expected_start_date = 
dag.following_schedule(run.execution_date)
-                if expected_start_date:
-                    schedule_delay = run.start_date - expected_start_date
-                    Stats.timing(
-                        f'dagrun.schedule_delay.{dag.dag_id}',
-                        schedule_delay,
-                    )
-
-        self._update_dag_next_dagruns(dag_models, session)
+            dag_model.next_dagrun, dag_model.next_dagrun_create_after = 
dag.next_dagrun_info(
+                dag_model.next_dagrun
+            )
 
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
         # memory for larger dags? or expunge_all()
 
-    def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], 
session: Session) -> None:
+    def _start_queued_dagruns(
+        self,
+        session: Session,
+    ) -> int:
         """
-        Bulk update the next_dagrun and next_dagrun_create_after for all the 
dags.
+        Find DagRuns in queued state and decide moving them to running state
 
-        We batch the select queries to get info about all the dags at once
+        :param dag_run: The DagRun to schedule
         """
-        # Check max_active_runs, to see if we are _now_ at the limit for any of
-        # these dag? (we've just created a DagRun for them after all)
-        active_runs_of_dags = dict(
+        dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
+
+        active_runs_of_dags = defaultdict(
+            lambda: 0,
             session.query(DagRun.dag_id, func.count('*'))
-            .filter(
-                DagRun.dag_id.in_([o.dag_id for o in dag_models]),
+            .filter(  # We use `list` here because SQLA doesn't accept a set
+                DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),

Review comment:
       oh yeah that makes sense, can you add a comment about removing 
duplicates (unless you have already and I missed it)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to