ephraimbuddy commented on a change in pull request #19528:
URL: https://github.com/apache/airflow/pull/19528#discussion_r747250661
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -913,26 +913,23 @@ def _create_dag_runs(self, dag_models:
Collection[DagModel], session: Session) -
creating_job_id=self.id,
)
active_runs_of_dags[dag.dag_id] += 1
- self._update_dag_next_dagruns(dag, dag_model,
active_runs_of_dags[dag.dag_id])
+ if self._should_update_dag_next_dagruns(dag, dag_model,
active_runs_of_dags[dag.dag_id]):
+ dag_model.calculate_dagrun_date_fields(dag, data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep
lots of state/object in
# memory for larger dags? or expunge_all()
- def _update_dag_next_dagruns(self, dag, dag_model: DagModel,
total_active_runs) -> None:
- """
- Update the next_dagrun, next_dagrun_data_interval_start/end
- and next_dagrun_create_after for this dag.
- """
- if total_active_runs >= dag_model.max_active_runs:
+ def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel,
total_active_runs) -> bool:
+ """Check if the dag's next_dagruns_create_after should be updated."""
+ if total_active_runs >= dag.max_active_runs:
self.log.info(
"DAG %s is at (or above) max_active_runs (%d of %d), not
creating any more runs",
dag_model.dag_id,
total_active_runs,
- dag_model.max_active_runs,
+ dag.max_active_runs,
Review comment:
It's been worked on at https://github.com/apache/airflow/pull/19367. I
just decided to update it here as the other PR deals with some other things and
is not included in this release
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]