This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 71810e55336 Remove stale dag check in the scheduler (#47304)
71810e55336 is described below
commit 71810e55336f14d419f1d3a31517c5f29390190c
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Mar 3 16:28:45 2025 +0100
Remove stale dag check in the scheduler (#47304)
This commit removes the stale dag check in the scheduler, leaving that
responsibility to the DAG processor. Since the scheduler doesn't run
the dag processor anymore, it shouldn't be determining if a DAG is stale
---
airflow/jobs/scheduler_job_runner.py | 28 ----------------------------
tests/jobs/test_scheduler_job.py | 29 -----------------------------
2 files changed, 57 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 0afd8904d30..6cb90e46540 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1024,11 +1024,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self._update_asset_orphanage,
)
- timers.call_regular_interval(
- conf.getfloat("scheduler", "parsing_cleanup_interval"),
- self._cleanup_stale_dags,
- )
-
if any(x.is_local for x in self.job.executors):
bundle_cleanup_mgr = BundleUsageTrackingManager()
check_interval = conf.getint(
@@ -2062,29 +2057,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
return zombie_message_details
- @provide_session
- def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
- """
- Find all dags that were not updated by Dag Processor recently and mark
them as inactive.
-
- In case one of DagProcessors is stopped (in case there are multiple of
them
- for different dag folders), its dags are never marked as inactive.
- TODO: AIP-66 Does it make sense to mark them as inactive just because
the processor isn't running?
- """
- self.log.debug("Checking dags not parsed within last %s seconds.",
self._dag_stale_not_seen_duration)
- limit_lpt = timezone.utcnow() -
timedelta(seconds=self._dag_stale_not_seen_duration)
- stale_dags = session.scalars(
- select(DagModel).where(DagModel.is_active,
DagModel.last_parsed_time < limit_lpt)
- ).all()
- if not stale_dags:
- self.log.debug("Not stale dags found.")
- return
-
- self.log.info("Found (%d) stales dags not parsed after %s.",
len(stale_dags), limit_lpt)
- for dag in stale_dags:
- dag.is_active = False
- session.flush()
-
@provide_session
def _update_asset_orphanage(self, session: Session = NEW_SESSION) -> None:
"""
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f4704390cc3..b4ebc4761e3 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -5680,35 +5680,6 @@ class TestSchedulerJob:
callback_requests[0].ti = None
assert expected_failure_callback_requests[0] == callback_requests[0]
- def test_cleanup_stale_dags(self, testing_dag_bundle):
- dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
- with create_session() as session:
- dag = dagbag.get_dag("test_example_bash_operator")
- DAG.bulk_write_to_db("testing", None, [dag])
- dm = DagModel.get_current("test_example_bash_operator")
- # Make it "stale".
- dm.last_parsed_time = timezone.utcnow() - timedelta(minutes=11)
- session.merge(dm)
-
- # This one should remain active.
- dag = dagbag.get_dag("test_start_date_scheduling")
- DAG.bulk_write_to_db("testing", None, [dag])
-
- session.flush()
-
- scheduler_job = Job(executor=MockExecutor())
- self.job_runner = SchedulerJobRunner(job=scheduler_job)
-
- active_dag_count =
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
- assert active_dag_count == 2
-
- self.job_runner._cleanup_stale_dags(session)
-
- session.flush()
-
- active_dag_count =
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
- assert active_dag_count == 1
-
@mock.patch.object(settings, "USE_JOB_SCHEDULE", False)
def run_scheduler_until_dagrun_terminal(self):
"""