This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e9cc14808b0 Introducing worker configs for task sdk (#48610)
e9cc14808b0 is described below

commit e9cc14808b0f91c33309fe0842e5c8375ec2122f
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Apr 1 17:20:43 2025 +0530

    Introducing worker configs for task sdk (#48610)
---
 .../src/airflow/config_templates/config.yml        | 36 ++++++++++++++++++++++
 task-sdk/src/airflow/sdk/api/client.py             | 11 +++----
 .../src/airflow/sdk/execution_time/supervisor.py   | 10 +++---
 3 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 24d769c11ec..e2d8ca4f0c6 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1428,6 +1428,42 @@ workers:
       sensitive: true
       example: ~
       default: ""
+    min_heartbeat_interval:
+      description: |
+        The minimum interval (in seconds) at which the worker checks the task 
instance's
+        heartbeat status with the API server to confirm it is still alive.
+      version_added: 3.0.0
+      type: integer
+      example: ~
+      default: "5"
+    max_failed_heartbeats:
+      description: |
+        The maximum number of consecutive failed heartbeats before terminating 
the task instance process.
+      version_added: 3.0.0
+      type: integer
+      example: ~
+      default: "3"
+    execution_api_retries:
+      description: |
+        The maximum number of retry attempts to the execution API server.
+      version_added: 3.0.0
+      type: integer
+      example: ~
+      default: "5"
+    execution_api_retry_wait_min:
+      description: |
+        The minimum amount of time (in seconds) to wait before retrying a 
failed API request.
+      version_added: 3.0.0
+      type: float
+      example: ~
+      default: "1.0"
+    execution_api_retry_wait_max:
+      description: |
+        The maximum amount of time (in seconds) to wait before retrying a 
failed API request.
+      version_added: 3.0.0
+      type: float
+      example: ~
+      default: "90.0"
 api_auth:
   description: Settings relating to authentication on the Airflow APIs
   options:
diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index d205eb28d55..5fd784fc4ee 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import logging
-import os
 import sys
 import uuid
 from http import HTTPStatus
@@ -32,6 +31,7 @@ from retryhttp import retry, wait_retry_after
 from tenacity import before_log, wait_random_exponential
 from uuid6 import uuid7
 
+from airflow.configuration import conf
 from airflow.sdk import __version__
 from airflow.sdk.api.datamodels._generated import (
     API_VERSION,
@@ -489,13 +489,10 @@ def noop_handler(request: httpx.Request) -> 
httpx.Response:
     return httpx.Response(200, json={"text": "Hello, world!"})
 
 
-# Config options for SDK how retries on HTTP requests should be handled
 # Note: Given defaults make attempts after 1, 3, 7, 15 and fails after 
31seconds
-# So far there is no other config facility in SDK we use ENV for the moment
-# TODO: Consider these env variables while handling airflow confs in task sdk
-API_RETRIES = int(os.getenv("AIRFLOW__WORKERS__API_RETRIES", 5))
-API_RETRY_WAIT_MIN = float(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 
1.0))
-API_RETRY_WAIT_MAX = float(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 
90.0))
+API_RETRIES = conf.getint("workers", "execution_api_retries")
+API_RETRY_WAIT_MIN = conf.getfloat("workers", "execution_api_retry_wait_min")
+API_RETRY_WAIT_MAX = conf.getfloat("workers", "execution_api_retry_wait_max")
 
 
 class Client(httpx.Client):
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index b93aff512ab..860756fa06a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -52,6 +52,7 @@ import psutil
 import structlog
 from pydantic import TypeAdapter
 
+from airflow.configuration import conf
 from airflow.sdk.api.client import Client, ServerResponseError
 from airflow.sdk.api.datamodels._generated import (
     AssetResponse,
@@ -109,13 +110,10 @@ __all__ = ["ActivitySubprocess", "WatchedSubprocess", 
"supervise"]
 
 log: FilteringBoundLogger = structlog.get_logger(logger_name="supervisor")
 
-# TODO: Pull this from config
-#  (previously `[scheduler] task_instance_heartbeat_sec` with the following as 
fallback if it is 0:
-#  `[scheduler] task_instance_heartbeat_timeout`)
-HEARTBEAT_TIMEOUT: int = 30
+HEARTBEAT_TIMEOUT: int = conf.getint("scheduler", 
"task_instance_heartbeat_timeout")
 # Don't heartbeat more often than this
-MIN_HEARTBEAT_INTERVAL: int = 5
-MAX_FAILED_HEARTBEATS: int = 3
+MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval")
+MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats")
 
 # These are the task instance states that require some additional information 
to transition into.
 # "Directly" here means that the PATCH API calls to transition into these 
states are

Reply via email to