ashb commented on a change in pull request #18897:
URL: https://github.com/apache/airflow/pull/18897#discussion_r726913879
##########
File path: airflow/models/dag.py
##########
@@ -2425,6 +2425,17 @@ def bulk_write_to_db(cls, dags: Collection["DAG"],
session=None):
)
most_recent_runs = {run.dag_id: run for run in most_recent_runs_iter}
+ # Get number of active dagruns for all dags we are processing as a
single query.
+ num_active_runs = dict(
+ session.query(DagRun.dag_id, func.count('*'))
+ .filter(
+ DagRun.dag_id.in_(existing_dag_ids),
+ DagRun.state == State.RUNNING,
Review comment:
Shouldn't this include queued too?
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -878,12 +877,47 @@ def _create_dag_runs(self, dag_models:
Collection[DagModel], session: Session) -
dag_hash=dag_hash,
creating_job_id=self.id,
)
- queued_runs_of_dags[dag_model.dag_id] += 1
- dag_model.calculate_dagrun_date_fields(dag, data_interval)
-
+ self._update_dag_next_dagruns(dag_models, session)
Review comment:
Can we not re-use max_active_runs we've already got and pass it in to
this function? It seems wasteful to perform the same query twice in quick
succession.
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -970,6 +1004,9 @@ def _schedule_dag_run(
session.flush()
self.log.info("Run %s of %s has timed-out", dag_run.run_id,
dag_run.dag_id)
+ # Work out if we should allow creating a new DagRun now?
+
self._update_dag_next_dagruns([session.query(DagModel).get(dag_run.dag_id)],
session)
Review comment:
I think we need to also call this after L1029 (`dag_run.update_state`)
if the state is now SUCCESS or FAILURE.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]