GitHub user saaifali created a discussion: Airflow Dynamic Task Fails with
httpx time out on an Instance with high load and around 300 active DAGs.
Airflow version 3.1.1
Running on Kubernetes in a Corporate Network
Active DAGs : Around 300
Deployment:
2 Api servers with one worker each
1 DAG Processor
8 Schedulers
1 Triggerer
Each has around 4 CPU and 20 GB memory limits.
I have a particular DAG set up with Dynamic Task Mapping which also uses the
`.map` function in between to map the output from my first task to have the
proper format for the dynamic mapped tasks.
Now this DAG ran perfectly fine on a dev server with less number of DAGs and
low load.
When I moved it to test in Production, My setup task which is responsible for
deciding which dynamic mapped task to run completes successfully, but the two
following jobs (one running on KubernetesExecutor and the other on
LocalExecutor) keeps on failing at the same step.
This is the Part where it tries to fetch the Xcoms from the API server and
times out with an Httpx Error : timeout.
The relevant DAG code:
```
# Step 1 — determine which job (if any) should run this DAG run.
# Returns a list of 0 or 1 job-descriptor dicts.
job_list = cgs_jobs_scheduler()
k8s_cmd = job_list.map(_k8s_cmd_filter)
local_cmd = job_list.map(_local_cmd_filter)
# Step 3a — .map() extracts the bash_command for k8s jobs only.
# If the job is local, _k8s_cmd_filter raises
AirflowSkipException
# → that mapped instance is skipped.
# 0 items in job_list → 0 mapped instances (task skipped
entirely).
k8s_job = BashOperator.partial(
task_id='k8s_job',
#map_index_template="""{{
task.bash_command.split('/')[-1].replace('.sh', '') }}""",
priority_weight=1,
env={
"AIRFLOW_TASK_URL": "{{ task_instance.log_url }}",
"XDV_USER": "{{ params.jobs_to_trigger[2] if
params.jobs_to_trigger|length > 2 else '' }}",
"WATERFALL_TYPE": "{{ params.WATERFALL_TYPE if 'WATERFALL_TYPE' in
params else '' }}",
"DV_RELEASE_KEY": "{{ params.DV_RELEASE_KEY if 'DV_RELEASE_KEY' in
params else '' }}",
},
append_env=True,
executor='KubernetesExecutor',
).expand(bash_command=k8s_cmd)
# Step 3b — .map() extracts the bash_command for local jobs only.
# If the job is k8s, _local_cmd_filter raises
AirflowSkipException
# → that mapped instance is skipped.
local_job = BashOperator.partial(
task_id='local_job',
#map_index_template="""{{
task.bash_command.split('/')[-1].replace('.sh', '') }}""",
priority_weight=1,
env={
"AIRFLOW_TASK_URL": "{{ task_instance.log_url }}",
"XDV_USER": "{{ params.jobs_to_trigger[2] if
params.jobs_to_trigger|length > 2 else '' }}",
"WATERFALL_TYPE": "{{ params.WATERFALL_TYPE if 'WATERFALL_TYPE' in
params else '' }}",
"DV_RELEASE_KEY": "{{ params.DV_RELEASE_KEY if 'DV_RELEASE_KEY' in
params else '' }}",
},
append_env=True,
executor='LocalExecutor',
).expand(bash_command=local_cmd)
```
The error in question:
```
Starting call to 'airflow.sdk.api.client.Client.request', this is the 4th time
calling it.
[2026-04-02T17:38:35.849180Z] {retries.py:99} DEBUG - Running
DagRun.get_running_dag_runs_to_examine with retries. Try 1 of 5
[2026-04-02T17:38:37.867007Z] {local_executor.py:100} ERROR - uhoh
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/httpx/_transports/default.py",
line 101, in map_httpcore_exceptions
yield
File
"/home/airflow/.local/lib/python3.11/site-packages/httpx/_transports/default.py",
line 250, in handle_request
resp = self._pool.handle_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py",
line 256, in handle_request
raise exc from None
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py",
line 236, in handle_request
response = connection.handle_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/connection.py",
line 103, in handle_request
return self._connection.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/http11.py",
line 136, in handle_request
raise exc
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/http11.py",
line 106, in handle_request
) = self._receive_response_headers(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/http11.py",
line 177, in _receive_response_headers
event = self._receive_event(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/http11.py",
line 217, in _receive_event
data = self._network_stream.read(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_backends/sync.py",
line 126, in read
with map_exceptions(exc_map):
File "/usr/python/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_exceptions.py",
line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.ReadTimeout: timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/local_executor.py",
line 96, in _run_worker
_execute_work(log, workload)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/local_executor.py",
line 124, in _execute_work
supervise(
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 1935, in supervise
exit_code = process.wait()
^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 990, in wait
self._monitor_subprocess()
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 1055, in _monitor_subprocess
alive = self._service_subprocess(max_wait_time=max_wait_time) is None
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 771, in _service_subprocess
need_more = socket_handler(key.fileobj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 1713, in cb
gen.send(request)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 612, in handle_requests
self._handle_request(msg, log, request.id)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
line 1216, in _handle_request
xcom = self.client.xcoms.get(
^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/api/client.py",
line 448, in get
resp = self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}",
params=params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py",
line 1053, in get
return self.request(
^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
338, in wrapped_f
return copy(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
477, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
378, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
420, in exc_check
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
187, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 449, in
result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 401, in
__get_result
raise self._exception
File
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line
480, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/api/client.py",
line 866, in request
return super().request(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py",
line 825, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py",
line 914, in send
response = self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py",
line 942, in _send_handling_auth
response = self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py",
line 979, in _send_handling_redirects
response = self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py",
line 1014, in _send_single_request
response = transport.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/httpx/_transports/default.py",
line 249, in handle_request
with map_httpcore_exceptions():
File "/usr/python/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File
"/home/airflow/.local/lib/python3.11/site-packages/httpx/_transports/default.py",
line 118, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ReadTimeout: timed out
```
I am at my wits end on how to resolve this issue.
Any ideas, suggestions, prayers, spells, etc will be appreciated. 💯
GitHub link: https://github.com/apache/airflow/discussions/64638
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]