GitHub user saaifali created a discussion: Airflow Dynamic Task Fails with 
httpx time out on an Instance with high load and around 300 active DAGs.

Airflow version 3.1.1
Running on Kubernetes in a Corporate Network
Active DAGs : Around 300
Deployment:
2 Api servers with one worker each
1 DAG Processor
8 Schedulers
1 Triggerer 
Each has around 4 CPU and 20 GB memory limits.

I have a particular DAG set up with Dynamic Task Mapping which also uses the 
`.map` function in between to map the output from my first task to have the 
proper format for the dynamic mapped tasks. 
Now this DAG ran perfectly fine on a dev server with less number of DAGs and 
low load. 

When I moved it to test in Production, My setup task which is responsible for 
deciding which dynamic mapped task to run completes successfully, but the two 
following jobs (one running on KubernetesExecutor and the other on 
LocalExecutor) keeps on failing at the same step. 
This is the Part where it tries to fetch the Xcoms from the API server and 
times out with an Httpx Error : timeout. 

The relevant DAG code:
```
# Step 1 — determine which job (if any) should run this DAG run.
    #           Returns a list of 0 or 1 job-descriptor dicts.
    job_list = cgs_jobs_scheduler()

    k8s_cmd = job_list.map(_k8s_cmd_filter)
    local_cmd = job_list.map(_local_cmd_filter)
    # Step 3a — .map() extracts the bash_command for k8s jobs only.
    #            If the job is local, _k8s_cmd_filter raises 
AirflowSkipException
    #            → that mapped instance is skipped.
    #            0 items in job_list → 0 mapped instances (task skipped 
entirely).
    k8s_job = BashOperator.partial(
        task_id='k8s_job',
        #map_index_template="""{{ 
task.bash_command.split('/')[-1].replace('.sh', '') }}""",
        priority_weight=1,
        env={
            "AIRFLOW_TASK_URL": "{{ task_instance.log_url }}",
            "XDV_USER": "{{ params.jobs_to_trigger[2] if 
params.jobs_to_trigger|length > 2 else '' }}",
            "WATERFALL_TYPE": "{{ params.WATERFALL_TYPE if 'WATERFALL_TYPE' in 
params else '' }}",
            "DV_RELEASE_KEY": "{{ params.DV_RELEASE_KEY if 'DV_RELEASE_KEY' in 
params else '' }}",
        },
        append_env=True,
        executor='KubernetesExecutor',
    ).expand(bash_command=k8s_cmd)

    # Step 3b — .map() extracts the bash_command for local jobs only.
    #            If the job is k8s, _local_cmd_filter raises 
AirflowSkipException
    #            → that mapped instance is skipped.
    local_job = BashOperator.partial(
        task_id='local_job',
        #map_index_template="""{{ 
task.bash_command.split('/')[-1].replace('.sh', '') }}""",
        priority_weight=1,
        env={
            "AIRFLOW_TASK_URL": "{{ task_instance.log_url }}",
            "XDV_USER": "{{ params.jobs_to_trigger[2] if 
params.jobs_to_trigger|length > 2 else '' }}",
            "WATERFALL_TYPE": "{{ params.WATERFALL_TYPE if 'WATERFALL_TYPE' in 
params else '' }}",
            "DV_RELEASE_KEY": "{{ params.DV_RELEASE_KEY if 'DV_RELEASE_KEY' in 
params else '' }}",
        },
        append_env=True,
        executor='LocalExecutor',
    ).expand(bash_command=local_cmd)
```

The error in question: 
```
Starting call to 'airflow.sdk.api.client.Client.request', this is the 4th time 
calling it.
[2026-04-02T17:38:35.849180Z] {retries.py:99} DEBUG - Running 
DagRun.get_running_dag_runs_to_examine with retries. Try 1 of 5
[2026-04-02T17:38:37.867007Z] {local_executor.py:100} ERROR - uhoh
Traceback (most recent call last):
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpx/_transports/default.py",
 line 101, in map_httpcore_exceptions
    yield
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpx/_transports/default.py",
 line 250, in handle_request
    resp = self._pool.handle_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py",
 line 256, in handle_request
    raise exc from None
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py",
 line 236, in handle_request
    response = connection.handle_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/connection.py",
 line 103, in handle_request
    return self._connection.handle_request(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/http11.py", 
line 136, in handle_request
    raise exc
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/http11.py", 
line 106, in handle_request
    ) = self._receive_response_headers(**kwargs)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/http11.py", 
line 177, in _receive_response_headers
    event = self._receive_event(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_sync/http11.py", 
line 217, in _receive_event
    data = self._network_stream.read(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_backends/sync.py", 
line 126, in read
    with map_exceptions(exc_map):
  File "/usr/python/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpcore/_exceptions.py", 
line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.ReadTimeout: timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/local_executor.py",
 line 96, in _run_worker
    _execute_work(log, workload)
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/local_executor.py",
 line 124, in _execute_work
    supervise(
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1935, in supervise
    exit_code = process.wait()
                ^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 990, in wait
    self._monitor_subprocess()
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1055, in _monitor_subprocess
    alive = self._service_subprocess(max_wait_time=max_wait_time) is None
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 771, in _service_subprocess
    need_more = socket_handler(key.fileobj)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1713, in cb
    gen.send(request)
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 612, in handle_requests
    self._handle_request(msg, log, request.id)
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1216, in _handle_request
    xcom = self.client.xcoms.get(
           ^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/api/client.py", 
line 448, in get
    resp = self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}", 
params=params)
           
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py", 
line 1053, in get
    return self.request(
           ^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
338, in wrapped_f
    return copy(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
477, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
378, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
420, in exc_check
    raise retry_exc.reraise()
          ^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
187, in reraise
    raise self.last_attempt.result()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 449, in 
result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
    raise self._exception
  File 
"/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 
480, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/airflow/sdk/api/client.py", 
line 866, in request
    return super().request(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py", 
line 825, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py", 
line 914, in send
    response = self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py", 
line 942, in _send_handling_auth
    response = self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py", 
line 979, in _send_handling_redirects
    response = self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/httpx/_client.py", 
line 1014, in _send_single_request
    response = transport.handle_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpx/_transports/default.py",
 line 249, in handle_request
    with map_httpcore_exceptions():
  File "/usr/python/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File 
"/home/airflow/.local/lib/python3.11/site-packages/httpx/_transports/default.py",
 line 118, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ReadTimeout: timed out
```

I am at my wits end on how to resolve this issue. 
Any ideas, suggestions, prayers, spells, etc will be appreciated. 💯 

GitHub link: https://github.com/apache/airflow/discussions/64638

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to