This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 20d8142869 Fix scheduler logic to plan new dag runs by ignoring manual
runs (#34027)
20d8142869 is described below
commit 20d81428699db240b65f72a92183255c24e8c19b
Author: Daniel DylÄ…g <[email protected]>
AuthorDate: Tue Sep 5 15:01:33 2023 +0200
Fix scheduler logic to plan new dag runs by ignoring manual runs (#34027)
* Fix manual task triggering scheduled tasks
Fixes #33949
* fix static checks
* static checks
* add unit test
* static check
* Undo renaming
* Update airflow/jobs/scheduler_job_runner.py
Co-authored-by: Tzu-ping Chung <[email protected]>
* use keyword-only arguments for last_dag_run and total_active_runs
---------
Co-authored-by: daniel.dylag <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/jobs/scheduler_job_runner.py | 33 +++++++++++++++++-------
tests/jobs/test_scheduler_job.py | 50 ++++++++++++++++++++++++++++++++----
2 files changed, 69 insertions(+), 14 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 1507a6f06f..6c4652ef1a 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1189,7 +1189,11 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
)
active_runs_of_dags[dag.dag_id] += 1
if self._should_update_dag_next_dagruns(
- dag, dag_model, active_runs_of_dags[dag.dag_id],
session=session
+ dag,
+ dag_model,
+ last_dag_run=None,
+ total_active_runs=active_runs_of_dags[dag.dag_id],
+ session=session,
):
dag_model.calculate_dagrun_date_fields(dag, data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep
lots of state/object in
@@ -1297,9 +1301,22 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
)
def _should_update_dag_next_dagruns(
- self, dag: DAG, dag_model: DagModel, total_active_runs: int | None =
None, *, session: Session
+ self,
+ dag: DAG,
+ dag_model: DagModel,
+ *,
+ last_dag_run: DagRun | None = None,
+ total_active_runs: int | None = None,
+ session: Session,
) -> bool:
"""Check if the dag's next_dagruns_create_after should be updated."""
+ # If last_dag_run is defined, the update was triggered by a scheduling
decision in this DAG run.
+ # In such case, schedule next only if last_dag_run is finished and was
an automated run.
+ if last_dag_run and not (
+ last_dag_run.state in State.finished_dr_states
+ and last_dag_run.run_type in [DagRunType.SCHEDULED,
DagRunType.BACKFILL_JOB]
+ ):
+ return False
# If the DAG never schedules skip save runtime
if not dag.timetable.can_be_scheduled:
return False
@@ -1434,8 +1451,8 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
session.merge(task_instance)
session.flush()
self.log.info("Run %s of %s has timed-out", dag_run.run_id,
dag_run.dag_id)
- # Work out if we should allow creating a new DagRun now?
- if self._should_update_dag_next_dagruns(dag, dag_model,
session=session):
+
+ if self._should_update_dag_next_dagruns(dag, dag_model,
last_dag_run=dag_run, session=session):
dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
callback_to_execute = DagCallbackRequest(
@@ -1462,11 +1479,9 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
return callback
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
schedulable_tis, callback_to_run =
dag_run.update_state(session=session, execute_callbacks=False)
- # Check if DAG not scheduled then skip interval calculation to same
scheduler runtime
- if dag_run.state in State.finished_dr_states:
- # Work out if we should allow creating a new DagRun now?
- if self._should_update_dag_next_dagruns(dag, dag_model,
session=session):
- dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
+
+ if self._should_update_dag_next_dagruns(dag, dag_model,
last_dag_run=dag_run, session=session):
+ dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the execution dates and dag
# IDs in a single query, but it turns out that can be _very very slow_
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 94cf3b6728..4fdb006a38 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1756,10 +1756,7 @@ class TestSchedulerJob:
# Need to use something that doesn't immediately get marked as
success by the scheduler
BashOperator(task_id="task", bash_command="true")
- dag_run = dag_maker.create_dagrun(
- state=State.RUNNING,
- session=session,
- )
+ dag_run = dag_maker.create_dagrun(state=State.RUNNING,
session=session, run_type=DagRunType.SCHEDULED)
# Reach max_active_runs
for _ in range(3):
@@ -3458,7 +3455,50 @@ class TestSchedulerJob:
self.job_runner = SchedulerJobRunner(job=scheduler_job)
assert excepted is self.job_runner._should_update_dag_next_dagruns(
- dag, dag_model, number_running, session=session
+ dag, dag_model, total_active_runs=number_running, session=session
+ )
+
+ @pytest.mark.parametrize(
+ "run_type, should_update",
+ [
+ (DagRunType.MANUAL, False),
+ (DagRunType.SCHEDULED, True),
+ (DagRunType.BACKFILL_JOB, True),
+ (DagRunType.DATASET_TRIGGERED, False),
+ ],
+ ids=[
+ DagRunType.MANUAL.name,
+ DagRunType.SCHEDULED.name,
+ DagRunType.BACKFILL_JOB.name,
+ DagRunType.DATASET_TRIGGERED.name,
+ ],
+ )
+ def test_should_update_dag_next_dagruns_after_run_type(self, run_type,
should_update, session, dag_maker):
+ """Test that whether next dagrun is updated depends on run type"""
+ with dag_maker(
+ dag_id="test_should_update_dag_next_dagruns_after_run_type",
+ schedule="*/1 * * * *",
+ max_active_runs=10,
+ ) as dag:
+ EmptyOperator(task_id="dummy")
+
+ dag_model = dag_maker.dag_model
+
+ run = dag_maker.create_dagrun(
+ run_id="run",
+ run_type=run_type,
+ execution_date=DEFAULT_DATE,
+ start_date=timezone.utcnow(),
+ state=State.SUCCESS,
+ session=session,
+ )
+
+ session.flush()
+ scheduler_job = Job(executor=self.null_exec)
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ assert should_update is
self.job_runner._should_update_dag_next_dagruns(
+ dag, dag_model, last_dag_run=run, total_active_runs=0,
session=session
)
def test_create_dag_runs(self, dag_maker):