This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 55791def1cc878eb05383782235cf0d26f9fa090 Author: Daniel Standish <[email protected]> AuthorDate: Mon May 6 08:24:38 2024 -0700 Only heartbeat if necessary in backfill loop (#39399) Currently, backfill sleeps for a minute in every iteration, which is extremely slow. The reason is that it waits synchronously until a heartbeat is necessary. Since the loop is otherwise fast, this results in waits of up to a minute between syncing. With this change, if we don't add sleep(1), the loop will be very fast and generate tons of logs. So I sleep each second to slow it down just a bit. (cherry picked from commit fb169536a32b7247ee58ef4bf1e3deccfce23341) --- airflow/jobs/backfill_job_runner.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 6be72b2c95..ace6a00131 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -46,7 +46,7 @@ from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS from airflow.timetables.base import DagRunInfo from airflow.utils import helpers, timezone -from airflow.utils.configuration import conf as airflow_conf, tmp_configuration_copy +from airflow.utils.configuration import tmp_configuration_copy from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState, State, TaskInstanceState @@ -475,10 +475,8 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): """ executed_run_dates = [] - is_unit_test = airflow_conf.getboolean("core", "unit_test_mode") - while (ti_status.to_run or ti_status.running) and not ti_status.deadlocked: - self.log.debug("*** Clearing out not_ready list ***") + self.log.debug("Clearing out not_ready list") ti_status.not_ready.clear() # we need to execute the tasks bottom to top @@ -697,7 +695,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): self.log.debug(e) perform_heartbeat( - job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=is_unit_test + job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True ) # execute the tasks in the queue executor.heartbeat() @@ -749,6 +747,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin): self._log_progress(ti_status) session.commit() + time.sleep(1) # return updated status return executed_run_dates
