This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e9cc14808b0 Introducing worker configs for task sdk (#48610)
e9cc14808b0 is described below
commit e9cc14808b0f91c33309fe0842e5c8375ec2122f
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Apr 1 17:20:43 2025 +0530
Introducing worker configs for task sdk (#48610)
---
.../src/airflow/config_templates/config.yml | 36 ++++++++++++++++++++++
task-sdk/src/airflow/sdk/api/client.py | 11 +++----
.../src/airflow/sdk/execution_time/supervisor.py | 10 +++---
3 files changed, 44 insertions(+), 13 deletions(-)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 24d769c11ec..e2d8ca4f0c6 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1428,6 +1428,42 @@ workers:
sensitive: true
example: ~
default: ""
+ min_heartbeat_interval:
+ description: |
+ The minimum interval (in seconds) at which the worker checks the task
instance's
+ heartbeat status with the API server to confirm it is still alive.
+ version_added: 3.0.0
+ type: integer
+ example: ~
+ default: "5"
+ max_failed_heartbeats:
+ description: |
+ The maximum number of consecutive failed heartbeats before terminating
the task instance process.
+ version_added: 3.0.0
+ type: integer
+ example: ~
+ default: "3"
+ execution_api_retries:
+ description: |
+ The maximum number of retry attempts to the execution API server.
+ version_added: 3.0.0
+ type: integer
+ example: ~
+ default: "5"
+ execution_api_retry_wait_min:
+ description: |
+ The minimum amount of time (in seconds) to wait before retrying a
failed API request.
+ version_added: 3.0.0
+ type: float
+ example: ~
+ default: "1.0"
+ execution_api_retry_wait_max:
+ description: |
+ The maximum amount of time (in seconds) to wait before retrying a
failed API request.
+ version_added: 3.0.0
+ type: float
+ example: ~
+ default: "90.0"
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index d205eb28d55..5fd784fc4ee 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -18,7 +18,6 @@
from __future__ import annotations
import logging
-import os
import sys
import uuid
from http import HTTPStatus
@@ -32,6 +31,7 @@ from retryhttp import retry, wait_retry_after
from tenacity import before_log, wait_random_exponential
from uuid6 import uuid7
+from airflow.configuration import conf
from airflow.sdk import __version__
from airflow.sdk.api.datamodels._generated import (
API_VERSION,
@@ -489,13 +489,10 @@ def noop_handler(request: httpx.Request) ->
httpx.Response:
return httpx.Response(200, json={"text": "Hello, world!"})
-# Config options for SDK how retries on HTTP requests should be handled
# Note: Given defaults make attempts after 1, 3, 7, 15 and fails after
31seconds
-# So far there is no other config facility in SDK we use ENV for the moment
-# TODO: Consider these env variables while handling airflow confs in task sdk
-API_RETRIES = int(os.getenv("AIRFLOW__WORKERS__API_RETRIES", 5))
-API_RETRY_WAIT_MIN = float(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN",
1.0))
-API_RETRY_WAIT_MAX = float(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX",
90.0))
+API_RETRIES = conf.getint("workers", "execution_api_retries")
+API_RETRY_WAIT_MIN = conf.getfloat("workers", "execution_api_retry_wait_min")
+API_RETRY_WAIT_MAX = conf.getfloat("workers", "execution_api_retry_wait_max")
class Client(httpx.Client):
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index b93aff512ab..860756fa06a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -52,6 +52,7 @@ import psutil
import structlog
from pydantic import TypeAdapter
+from airflow.configuration import conf
from airflow.sdk.api.client import Client, ServerResponseError
from airflow.sdk.api.datamodels._generated import (
AssetResponse,
@@ -109,13 +110,10 @@ __all__ = ["ActivitySubprocess", "WatchedSubprocess",
"supervise"]
log: FilteringBoundLogger = structlog.get_logger(logger_name="supervisor")
-# TODO: Pull this from config
-# (previously `[scheduler] task_instance_heartbeat_sec` with the following as
fallback if it is 0:
-# `[scheduler] task_instance_heartbeat_timeout`)
-HEARTBEAT_TIMEOUT: int = 30
+HEARTBEAT_TIMEOUT: int = conf.getint("scheduler",
"task_instance_heartbeat_timeout")
# Don't heartbeat more often than this
-MIN_HEARTBEAT_INTERVAL: int = 5
-MAX_FAILED_HEARTBEATS: int = 3
+MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval")
+MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats")
# These are the task instance states that require some additional information
to transition into.
# "Directly" here means that the PATCH API calls to transition into these
states are