This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9b466bd13d Introduce Heartbeat Parameter to Allow Per-LocalTaskJob
Configuration (#32313)
9b466bd13d is described below
commit 9b466bd13dd34d2a37b49687241f54f4d2df3b18
Author: Sai Pragna Etikyala <[email protected]>
AuthorDate: Fri Jul 14 23:56:29 2023 -0700
Introduce Heartbeat Parameter to Allow Per-LocalTaskJob Configuration
(#32313)
* Refactor Job Heartbeat Parameter to Allow Per-Job Configuration
This pull request introduces changes to allow users to set heartbeat
expectations separately for LocalTaskJob,. his resolves a current limitation
where all job types share a single configuration parameter for expected
heartbeat time.
related to: https://github.com/apache/airflow/issues/30908
---
airflow/config_templates/config.yml | 9 +++++++++
airflow/config_templates/default_airflow.cfg | 5 +++++
airflow/jobs/local_task_job_runner.py | 7 +++++--
3 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 015da426c4..b3161f49b7 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2302,6 +2302,15 @@ scheduler:
type: integer
example: ~
default: "5"
+ local_task_job_heartbeat_sec:
+ description: |
+ The frequency (in seconds) at which the LocalTaskJob should send
heartbeat signals to the
+ scheduler to notify it's still alive. If this value is set to 0, the
heartbeat interval will default
+ to the value of scheduler_zombie_task_threshold.
+ version_added: 2.7.0
+ type: integer
+ example: ~
+ default: "0"
num_runs:
description: |
The number of times to try to schedule each DAG file
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index cff56508df..a71bd3ccaa 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1195,6 +1195,11 @@ job_heartbeat_sec = 5
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5
+# The frequency (in seconds) at which the LocalTaskJob should send heartbeat
signals to the
+# scheduler to notify it's still alive. If this value is set to 0, the
heartbeat interval will default
+# to the value of scheduler_zombie_task_threshold.
+local_task_job_heartbeat_sec = 0
+
# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1
diff --git a/airflow/jobs/local_task_job_runner.py
b/airflow/jobs/local_task_job_runner.py
index fd234e4150..6184a3e7fc 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -157,8 +157,11 @@ class LocalTaskJobRunner(BaseJobRunner["Job |
JobPydantic"], LoggingMixin):
return_code = None
try:
self.task_runner.start()
-
- heartbeat_time_limit = conf.getint("scheduler",
"scheduler_zombie_task_threshold")
+ local_task_job_heartbeat_sec = conf.getint("scheduler",
"local_task_job_heartbeat_sec")
+ if local_task_job_heartbeat_sec < 1:
+ heartbeat_time_limit = conf.getint("scheduler",
"scheduler_zombie_task_threshold")
+ else:
+ heartbeat_time_limit = local_task_job_heartbeat_sec
# LocalTaskJob should not run callbacks, which are handled by
TaskInstance._run_raw_task
# 1, LocalTaskJob does not parse DAG, thus cannot run callbacks