This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 55791def1cc878eb05383782235cf0d26f9fa090
Author: Daniel Standish <[email protected]>
AuthorDate: Mon May 6 08:24:38 2024 -0700

    Only heartbeat if necessary in backfill loop (#39399)
    
    Currently, backfill sleeps for a minute in every iteration, which is 
extremely slow.
    
    The reason is that it waits synchronously until a heartbeat is necessary. 
Since the loop is otherwise fast, this results in waits of up to a minute 
between syncing.
    
    With this change, if we don't add sleep(1), the loop will be very fast and 
generate tons of logs. So I sleep each second to slow it down just a bit.
    
    (cherry picked from commit fb169536a32b7247ee58ef4bf1e3deccfce23341)
---
 airflow/jobs/backfill_job_runner.py | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index 6be72b2c95..ace6a00131 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -46,7 +46,7 @@ from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS
 from airflow.timetables.base import DagRunInfo
 from airflow.utils import helpers, timezone
-from airflow.utils.configuration import conf as airflow_conf, 
tmp_configuration_copy
+from airflow.utils.configuration import tmp_configuration_copy
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -475,10 +475,8 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
         """
         executed_run_dates = []
 
-        is_unit_test = airflow_conf.getboolean("core", "unit_test_mode")
-
         while (ti_status.to_run or ti_status.running) and not 
ti_status.deadlocked:
-            self.log.debug("*** Clearing out not_ready list ***")
+            self.log.debug("Clearing out not_ready list")
             ti_status.not_ready.clear()
 
             # we need to execute the tasks bottom to top
@@ -697,7 +695,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
                 self.log.debug(e)
 
             perform_heartbeat(
-                job=self.job, heartbeat_callback=self.heartbeat_callback, 
only_if_necessary=is_unit_test
+                job=self.job, heartbeat_callback=self.heartbeat_callback, 
only_if_necessary=True
             )
             # execute the tasks in the queue
             executor.heartbeat()
@@ -749,6 +747,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
 
             self._log_progress(ti_status)
             session.commit()
+            time.sleep(1)
 
         # return updated status
         return executed_run_dates

Reply via email to