tejasvichudasma-zefr opened a new issue, #58499:
URL: https://github.com/apache/airflow/issues/58499
### Apache Airflow version
3.1.3
### If "Other Airflow 2/3 version" selected, which one?
`ReferenceError` in API Server during Dynamic Mapping of Nested TaskGroups
### What happened?
### Description
When executing a DAG that uses **dynamic task mapping** with **nested
TaskGroups**, the Airflow API Server crashes due to a `ReferenceError` when the
worker attempts to start the dynamically mapped tasks.
The issue specifically arises in the logic for determining upstream mapped
indexes when processing task instance start requests for tasks within nested
mapped TaskGroups. The crash leads to the worker receiving a
`ServerResponseError` after the API call fails.
### Environment
* **Airflow Version:** 3.1.2
* **Deployment:** Kubernetes
* **Executor:** Celery, Kubernetes
* **Python Version:** 3.12 (as indicated by the traceback paths)
### Observed Behavior
#### 1\. Worker/Scheduler Logs (Client Side)
The worker shows repeated retries to connect to the API server before
receiving a fatal `ServerResponseError`. The `exit_code: -9` confirms the
process being monitored (the task execution process) was **killed by the
operating system** (`SIGKILL`).
```
2025-11-19T19:16:29.661325Z [warning ] Starting call to
'airflow.sdk.api.client.Client.request', this is the 4th time calling it.
[airflow.sdk.api.client] loc=before.py:42
2025-11-19T19:16:31.821107Z [info ] Process exited
[supervisor] exit_code=-9 loc=supervisor.py:709 pid=18700 signal_sent=SIGKILL
2025-11-19T19:16:31.835657Z [error ] Task
execute_workload[a200540d-44f9-4d1e-a9fb-d4f97954747e] raised unexpected:
ServerResponseError('Server returned error') [celery.app.trace] loc=trace.py:267
... traceback leading to:
airflow.sdk.api.client.ServerResponseError: Server returned error
```
#### 2\. Airflow API Server Logs (Server Side - **Critical Error**)
The API server logs show a `ReferenceError` occurring within the logic that
resolves mapped task dependencies, specifically related to iterating over
TaskGroups in a serialized object.
```
INFO: 10.156.192.15:48546 - "POST
/api/v2/dags/load_testing_dag/clearTaskInstances HTTP/1.1" 200 OK
2025-11-19T19:28:33.889624Z [info ] Task started
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=*
loc=task_instances.py:199 previous_state=queued
ti_id=019a9d97-07bb-71a7-b87b-5658239406c5
2025-11-19T19:28:33.891432Z [info ] Task started
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=*
loc=task_instances.py:199 previous_state=queued
ti_id=019a9d97-07ab-76cd-8bdc-a11067a69f39
2025-11-19T19:28:33.891842Z [info ] Task instance state updated
[airflow.api_fastapi.execution_api.routes.task_instances]
loc=task_instances.py:212 rows_affected=1
ti_id=019a9d97-07bb-71a7-b87b-5658239406c5
2025-11-19T19:28:33.895775Z [info ] Task instance state updated
[airflow.api_fastapi.execution_api.routes.task_instances]
loc=task_instances.py:212 rows_affected=1
ti_id=019a9d97-07ab-76cd-8bdc-a11067a69f39
2025-11-19T19:28:33.899396Z [info ] Task started
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=*
loc=task_instances.py:199 previous_state=queued
ti_id=019a9d97-081a-7d5f-9b19-00ae6b03db37
2025-11-19T19:28:33.899396Z [info ] Task started
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=*
loc=task_instances.py:199 previous_state=queued
ti_id=019a9d97-081a-7d5f-9b19-00ae6b03db37
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/fastapi/routing.py", line
219, in run_endpoint_function
return await dependant.call(**values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/structure/versions.py",
line 474, in decorator
response = await self._convert_endpoint_response_to_version(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/structure/versions.py",
line 520, in _convert_endpoint_response_to_version
response_or_response_body: Union[FastapiResponse, object] = await
run_in_threadpool(
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/starlette/concurrency.py",
line 38, in run_in_threadpool
return await anyio.to_thread.run_sync(func)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/anyio/to_thread.py", line
56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py",
line 2485, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py",
line 976, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/schema_generation.py",
line 515, in __call__
return self._original_callable(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py",
line 256, in ti_run
upstream_map_indexes = dict(
^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py",
line 303, in _get_upstream_map_indexes
elif task.get_closest_mapped_task_group() == upstream_mapped_group:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2272, in get_closest_mapped_task_group
return next(self.iter_mapped_task_groups(), None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 2263, in iter_mapped_task_groups
yield from group.iter_mapped_task_groups()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ReferenceError: weakly-referenced object no longer exists
```
### What you think should happen instead?
All tasks under taskgroup should complete without any errors
### How to reproduce
Here is sample dag. If the error does not show right away then update the
code to create new version of the dag and clear the task inside taskgroup. Wait
for the task in next taskgroup to fail.
```from functools import partial
import random
from time import sleep
from airflow.sdk import dag, task, task_group
from datetime import datetime, timezone, timedelta
from airflow.decorators import task_group
EXECUTOR_TYPE="CeleryExecutor"
@dag(
schedule=None,
start_date=datetime.now(timezone.utc) - timedelta(days=1),
catchup=False,
description="DAG mixing celery and {EXECUTOR_TYPE}s using Taskflow API",
tags=["example", "executor", "kubernetes", "celery"],
)
def load_testing_dag():
# Change to "CeleryExecutor" to switch executors
@task(executor="CeleryExecutor")
def compute_load_task():
print("This task is running with the CeleryExecutor !")
# Simulate some load computation
sleep(60) # Simulating load with sleep
return "content IDs found"
@task(executor="CeleryExecutor")
def partition_task():
print("This task is running with the CeleryExecutor !")
# Simulate some load computation
sleep(1) # Simulating load with sleep
return [random.randint(10, 60) for _ in range(50)]
@task(executor=EXECUTOR_TYPE)
def load_task():
print("This task is running with the {EXECUTOR_TYPE}!")
# Simulate some load computation
sleep(120) # Simulating load with sleep
return "content IDs loaded"
@task(executor=EXECUTOR_TYPE)
def model_process(input_uri):
print(f"Processing partition {input_uri} with {EXECUTOR_TYPE}")
sleep(input_uri) # Simulate processing time
return random.randint(60, 120)
def inference():
@task_group(group_id="inference_tier1_task_group")
def tg(input_uri):
@task_group(group_id="features")
def features(input_uri):
@task(executor=EXECUTOR_TYPE)
def metadata_task(input_uri):
print(f"Extracting features from {input_uri} with
{EXECUTOR_TYPE}")
sleep(input_uri) # Simulate processing time
return random.randint(60, 120)
@task(executor=EXECUTOR_TYPE)
def frame_extraction_task(input_uri):
print(f"Extracting features from {input_uri} with
{EXECUTOR_TYPE}")
sleep(input_uri) # Simulate processing time
return random.randint(60, 120)
metadata_task_output = metadata_task(input_uri)
frame_extraction_task_output =
frame_extraction_task(input_uri=metadata_task_output)
return {
"features": frame_extraction_task_output
}
@task_group(group_id="models")
def process_models(input_uri):
models = []
for model in ["model_a", "model_b", "model_c"]:
models.append(partial(
model_process.override(
task_id=model,
)
)(input_uri))
return models
@task_group(group_id="aggregation")
def aggregation(input_uri):
@task(executor=EXECUTOR_TYPE)
def aggregate_task(input_uri):
print(f"Aggregating results from {input_uri} with
{EXECUTOR_TYPE}")
sleep(10) # Simulate processing time
return "aggregation complete"
return aggregate_task(input_uri)
feature_outputs = features(input_uri=input_uri)
model_output_uris =
process_models(input_uri=feature_outputs["features"])
aggregation_task = aggregation(input_uri=model_output_uris)
return aggregation_task
return tg
inference_tier_1 = inference()
compute = compute_load_task()
partition = partition_task()
compute >> partition
inference_tier_1.expand(input_uri=partition) >> load_task()
load_testing_dag()
```
### Operating System
Debian GNU/Linux 12 (bookworm)
### Versions of Apache Airflow Providers
- apache-airflow-providers-celery==3.13.1
- apache-airflow-providers-cncf-kubernetes==10.10.0
- apache-airflow-providers-common-compat==1.8.0
- apache-airflow-providers-common-io==1.6.4
- apache-airflow-providers-common-sql==1.28.2
- apache-airflow-providers-fab==3.0.2
- apache-airflow-providers-git==0.1.0
- apache-airflow-providers-google==19.0.0
- apache-airflow-providers-http==5.5.0
- apache-airflow-providers-postgres==6.4.1
- apache-airflow-providers-redis==4.3.3
- apache-airflow-providers-smtp==2.3.1
- apache-airflow-providers-snowflake==6.6.1
- apache-airflow-providers-standard==1.9.1
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
This is an intermittent problem. Once the error occurs, then all dags start
failing.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]