tejasvichudasma-zefr opened a new issue, #58499:
URL: https://github.com/apache/airflow/issues/58499

   ### Apache Airflow version
   
   3.1.3
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   `ReferenceError` in API Server during Dynamic Mapping of Nested TaskGroups
   
   ### What happened?
   
   ### Description
   
   When executing a DAG that uses **dynamic task mapping** with **nested 
TaskGroups**, the Airflow API Server crashes due to a `ReferenceError` when the 
worker attempts to start the dynamically mapped tasks.
   
   The issue specifically arises in the logic for determining upstream mapped 
indexes when processing task instance start requests for tasks within nested 
mapped TaskGroups. The crash leads to the worker receiving a 
`ServerResponseError` after the API call fails.
   
   ### Environment
   
     * **Airflow Version:** 3.1.2
     * **Deployment:** Kubernetes
     * **Executor:** Celery, Kubernetes
     * **Python Version:** 3.12 (as indicated by the traceback paths)
   
   ### Observed Behavior
   
   #### 1\. Worker/Scheduler Logs (Client Side)
   
   The worker shows repeated retries to connect to the API server before 
receiving a fatal `ServerResponseError`. The `exit_code: -9` confirms the 
process being monitored (the task execution process) was **killed by the 
operating system** (`SIGKILL`).
   
   ```
   2025-11-19T19:16:29.661325Z [warning  ] Starting call to 
'airflow.sdk.api.client.Client.request', this is the 4th time calling it. 
[airflow.sdk.api.client] loc=before.py:42
   2025-11-19T19:16:31.821107Z [info     ] Process exited                 
[supervisor] exit_code=-9 loc=supervisor.py:709 pid=18700 signal_sent=SIGKILL
   2025-11-19T19:16:31.835657Z [error    ] Task 
execute_workload[a200540d-44f9-4d1e-a9fb-d4f97954747e] raised unexpected: 
ServerResponseError('Server returned error') [celery.app.trace] loc=trace.py:267
   ... traceback leading to:
   airflow.sdk.api.client.ServerResponseError: Server returned error
   ```
   
   #### 2\. Airflow API Server Logs (Server Side - **Critical Error**)
   
   The API server logs show a `ReferenceError` occurring within the logic that 
resolves mapped task dependencies, specifically related to iterating over 
TaskGroups in a serialized object.
   
   ```
   INFO:     10.156.192.15:48546 - "POST 
/api/v2/dags/load_testing_dag/clearTaskInstances HTTP/1.1" 200 OK
   2025-11-19T19:28:33.889624Z [info     ] Task started                   
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=* 
loc=task_instances.py:199 previous_state=queued 
ti_id=019a9d97-07bb-71a7-b87b-5658239406c5
   2025-11-19T19:28:33.891432Z [info     ] Task started                   
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=* 
loc=task_instances.py:199 previous_state=queued 
ti_id=019a9d97-07ab-76cd-8bdc-a11067a69f39
   2025-11-19T19:28:33.891842Z [info     ] Task instance state updated    
[airflow.api_fastapi.execution_api.routes.task_instances] 
loc=task_instances.py:212 rows_affected=1 
ti_id=019a9d97-07bb-71a7-b87b-5658239406c5
   2025-11-19T19:28:33.895775Z [info     ] Task instance state updated    
[airflow.api_fastapi.execution_api.routes.task_instances] 
loc=task_instances.py:212 rows_affected=1 
ti_id=019a9d97-07ab-76cd-8bdc-a11067a69f39
   2025-11-19T19:28:33.899396Z [info     ] Task started                   
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=* 
loc=task_instances.py:199 previous_state=queued 
ti_id=019a9d97-081a-7d5f-9b19-00ae6b03db37
   
   2025-11-19T19:28:33.899396Z [info       ] Task started                 
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=* 
loc=task_instances.py:199 previous_state=queued 
ti_id=019a9d97-081a-7d5f-9b19-00ae6b03db37
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/fastapi/routing.py", line 
219, in run_endpoint_function
       return await dependant.call(**values)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/structure/versions.py",
 line 474, in decorator
       response = await self._convert_endpoint_response_to_version(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/structure/versions.py",
 line 520, in _convert_endpoint_response_to_version
       response_or_response_body: Union[FastapiResponse, object] = await 
run_in_threadpool(
                                                                   
^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/starlette/concurrency.py", 
line 38, in run_in_threadpool
       return await anyio.to_thread.run_sync(func)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/anyio/to_thread.py", line 
56, in run_sync
       return await get_async_backend().run_sync_in_worker_thread(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py",
 line 2485, in run_sync_in_worker_thread
       return await future
              ^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py",
 line 976, in run
       result = context.run(func, *args)
                ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/cadwyn/schema_generation.py",
 line 515, in __call__
       return self._original_callable(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py",
 line 256, in ti_run
       upstream_map_indexes = dict(
                              ^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py",
 line 303, in _get_upstream_map_indexes
       elif task.get_closest_mapped_task_group() == upstream_mapped_group:
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2272, in get_closest_mapped_task_group
       return next(self.iter_mapped_task_groups(), None)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 2263, in iter_mapped_task_groups
       yield from group.iter_mapped_task_groups()
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   ReferenceError: weakly-referenced object no longer exists
   ```
   
   
   ### What you think should happen instead?
   
   All tasks under taskgroup should complete without any errors
   
   ### How to reproduce
   
   Here is sample dag. If the error does not show right away then update the 
code to create new version of the dag and clear the task inside taskgroup. Wait 
for the task in next taskgroup to fail.  
   
   ```from functools import partial
   import random
   from time import sleep
   from airflow.sdk import dag, task, task_group
   from datetime import datetime, timezone, timedelta
   from airflow.decorators import task_group
   
   EXECUTOR_TYPE="CeleryExecutor"
   @dag(
       schedule=None,
       start_date=datetime.now(timezone.utc) - timedelta(days=1),
       catchup=False,
       description="DAG mixing celery and {EXECUTOR_TYPE}s using Taskflow API",
       tags=["example", "executor", "kubernetes", "celery"],
   )
   def load_testing_dag():
         # Change to "CeleryExecutor" to switch executors
   
       @task(executor="CeleryExecutor")
       def compute_load_task():
           print("This task is running with the CeleryExecutor !")
           # Simulate some load computation
           
           sleep(60)  # Simulating load with sleep
           return "content IDs found"
   
       @task(executor="CeleryExecutor")
       def partition_task():
           print("This task is running with the CeleryExecutor !")
           # Simulate some load computation
           
           sleep(1)  # Simulating load with sleep
   
           return [random.randint(10, 60) for _ in range(50)]
   
       @task(executor=EXECUTOR_TYPE)
       def load_task():
           print("This task is running with the {EXECUTOR_TYPE}!")
           # Simulate some load computation
           
           sleep(120)  # Simulating load with sleep
           return "content IDs loaded"
   
   
       @task(executor=EXECUTOR_TYPE)
       def model_process(input_uri):
           print(f"Processing partition {input_uri} with {EXECUTOR_TYPE}")
           sleep(input_uri)  # Simulate processing time
           return random.randint(60, 120)
   
       def inference():
   
           @task_group(group_id="inference_tier1_task_group")
           def tg(input_uri):
               @task_group(group_id="features")
               def features(input_uri):
                   @task(executor=EXECUTOR_TYPE)
                   def metadata_task(input_uri):
                       print(f"Extracting features from {input_uri} with 
{EXECUTOR_TYPE}")
                       sleep(input_uri)  # Simulate processing time
                       return random.randint(60, 120)
   
                   @task(executor=EXECUTOR_TYPE)
                   def frame_extraction_task(input_uri):
                       print(f"Extracting features from {input_uri} with 
{EXECUTOR_TYPE}")
                       sleep(input_uri)  # Simulate processing time
                       return random.randint(60, 120)
                   
                   metadata_task_output = metadata_task(input_uri)
                   frame_extraction_task_output = 
frame_extraction_task(input_uri=metadata_task_output)
                   return {
                   "features": frame_extraction_task_output
               }
               
               @task_group(group_id="models")
               def process_models(input_uri):
                   models = []
                   for model in ["model_a", "model_b", "model_c"]:
                       models.append(partial(
                           model_process.override(
                               task_id=model,
                           )
                       )(input_uri))
                   return models   
                       
               @task_group(group_id="aggregation")
               def aggregation(input_uri):
                   @task(executor=EXECUTOR_TYPE)
                   def aggregate_task(input_uri):
                       print(f"Aggregating results from {input_uri} with 
{EXECUTOR_TYPE}")
                       sleep(10)  # Simulate processing time
                       return "aggregation complete"
                   
                   return aggregate_task(input_uri)
                   
               
               feature_outputs = features(input_uri=input_uri)
               model_output_uris = 
process_models(input_uri=feature_outputs["features"])
               aggregation_task = aggregation(input_uri=model_output_uris)
               return aggregation_task
   
           return tg
   
       inference_tier_1 = inference()
       compute = compute_load_task()
       partition = partition_task()
       
       compute >> partition
       inference_tier_1.expand(input_uri=partition) >> load_task()
   
   load_testing_dag()
   ```
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   - apache-airflow-providers-celery==3.13.1
   - apache-airflow-providers-cncf-kubernetes==10.10.0
   - apache-airflow-providers-common-compat==1.8.0
   - apache-airflow-providers-common-io==1.6.4
   - apache-airflow-providers-common-sql==1.28.2
   - apache-airflow-providers-fab==3.0.2
   - apache-airflow-providers-git==0.1.0
   - apache-airflow-providers-google==19.0.0
   - apache-airflow-providers-http==5.5.0
   - apache-airflow-providers-postgres==6.4.1
   - apache-airflow-providers-redis==4.3.3
   - apache-airflow-providers-smtp==2.3.1
   - apache-airflow-providers-snowflake==6.6.1
   - apache-airflow-providers-standard==1.9.1
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   This is an intermittent problem. Once the error occurs, then all dags start 
failing. 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to