chmnata opened a new issue, #52730:
URL: https://github.com/apache/airflow/issues/52730

   ### Apache Airflow version
   
   3.0.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When there are skipped dynamically mapped taskgroup with a downstream task 
with `trigger_rule = all done`, the downstream tasks does not run and will get 
stuck in a queued state with  `state match` with no task logs.
   
   ```
   Executor LocalExecutor(parallelism=64) reported that the task instance 
<TaskInstance: task_state_changed_externally.any_task 
manual__2025-07-02T17:26:56.340268+00:00 [queued]> finished with state failed, 
but the task instance's state attribute is queued. Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
  
   ```
   
   ### What you think should happen instead?
   
   skipped upstream task should be considered "executed" therefore the 
downstream task with `trigger_rule = all done` should still run.  
   
   ### How to reproduce
   
   A modified version from [this closed 
issue](https://github.com/apache/airflow/issues/51457), which resulted in the 
same error and were fixed in 3.0.2. However the same error persist with skipped 
upstream task and trigger_rule = all_done downstream task. 
   
   ```
   from airflow.sdk import dag, task_group, task
   from datetime import datetime
   from airflow.exceptions import AirflowSkipException, AirflowFailException
   
   @dag(start_date=datetime(2025, 6, 28), schedule=None, catchup=False)
   def task_state_changed_externally():
       
       @task
       def get_nums():
           raise AirflowSkipException("Skipping this one intentionally")
   
           return [1, 2]
   
       # creating a task group using the decorator with the dynamic input my_num
       @task_group(group_id="group1")
       def tg1(my_num):
           @task
           def print_num(num):
               raise AirflowSkipException("Skipping this one intentionally")
               return num
   
           @task
           def add_42(num):
               if num == 2: 
                   raise AirflowSkipException("Skipping this one intentionally")
               return num + 42
   
           print_num(my_num) >> add_42(my_num)
           
       # a task downstream of dynamic task group
       @task(trigger_rule="all_done")
       def any_task():
           print("Test")
   
       nums = get_nums()
       dynamic_tg = tg1.expand(my_num = nums) #a dynamic task group mapped over 
an xcom
       dynamic_tg >> any_task() #any task downstream of a mapped task
   
   task_state_changed_externally()
   ```
   
   This is the error in api-server.log, 
   ```
   INFO:     127.0.0.1:54114 - "PATCH 
/airflow/execution/task-instances/0197cc2d-6471-7abe-b0db-f13c0242cdd9/run 
HTTP/1.1" 500 Internal Server Error
   ERROR:    Exception in ASGI application
     + Exception Group Traceback (most recent call last):
     |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_utils.py", 
line 76, in collapse_excgroups
     |     yield
     |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py",
 line 181, in __call__
     |     recv_stream.close()
     |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py",
 line 772, in __aexit__
     |     raise BaseExceptionGroup(
     | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 
sub-exception)
     +-+---------------- 1 ----------------
       | Traceback (most recent call last):
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py",
 line 409, in run_asgi
       |     result = await app(  # type: ignore[func-returns-value]
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/uvicorn/middleware/proxy_headers.py",
 line 60, in __call__
       |     return await self.app(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/applications.py",
 line 1054, in __call__
       |     await super().__call__(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/applications.py",
 line 112, in __call__
       |     await self.middleware_stack(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py",
 line 187, in __call__
       |     raise exc
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py",
 line 165, in __call__
       |     await self.app(scope, receive, _send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py",
 line 29, in __call__
       |     await responder(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py",
 line 126, in __call__
       |     await super().__call__(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py",
 line 46, in __call__
       |     await self.app(scope, receive, self.send_with_compression)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/cors.py",
 line 85, in __call__
       |     await self.app(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py",
 line 183, in __call__
       |     raise app_exc
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py",
 line 141, in coro
       |     await self.app(scope, receive_or_disconnect, send_no_error)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/exceptions.py",
 line 62, in __call__
       |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, 
send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       |     raise exc
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       |     await app(scope, receive, sender)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py",
 line 714, in __call__
       |     await self.middleware_stack(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py",
 line 734, in app
       |     await route.handle(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py",
 line 460, in handle
       |     await self.app(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/applications.py",
 line 259, in __call__
       |     await self.__call__(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/applications.py",
 line 1054, in __call__
       |     await super().__call__(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/applications.py",
 line 112, in __call__
       |     await self.middleware_stack(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py",
 line 187, in __call__
       |     raise exc
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py",
 line 165, in __call__
       |     await self.app(scope, receive, _send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py",
 line 181, in __call__
       |     recv_stream.close()
       |   File "/usr/lib64/python3.9/contextlib.py", line 137, in __exit__
       |     self.gen.throw(typ, value, traceback)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_utils.py", 
line 82, in collapse_excgroups
       |     raise exc
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py",
 line 178, in __call__
       |     response = await self.dispatch_func(request, call_next)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/middleware.py",
 line 124, in dispatch
       |     response = await call_next(request)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py",
 line 156, in call_next
       |     raise app_exc
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py",
 line 141, in coro
       |     await self.app(scope, receive_or_disconnect, send_no_error)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/exceptions.py",
 line 62, in __call__
       |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, 
send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       |     raise exc
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       |     await app(scope, receive, sender)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/routing.py", 
line 93, in __call__
       |     await self.process_request(scope=scope, receive=receive, 
send=send, routes=routes)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/routing.py", 
line 131, in process_request
       |     await route.handle(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py",
 line 288, in handle
       |     await self.app(scope, receive, send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py",
 line 76, in app
       |     await wrap_app_handling_exceptions(app, request)(scope, receive, 
send)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       |     raise exc
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       |     await app(scope, receive, sender)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py",
 line 73, in app
       |     response = await f(request)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/routing.py", 
line 301, in app
       |     raw_response = await run_endpoint_function(
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/routing.py", 
line 212, in run_endpoint_function
       |     return await dependant.call(**values)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/structure/versions.py",
 line 474, in decorator
       |     response = await self._convert_endpoint_response_to_version(
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/structure/versions.py",
 line 520, in _convert_endpoint_response_to_version
       |     response_or_response_body: Union[FastapiResponse, object] = await 
run_in_threadpool(
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/concurrency.py",
 line 37, in run_in_threadpool
       |     return await anyio.to_thread.run_sync(func)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/to_thread.py", 
line 56, in run_sync
       |     return await get_async_backend().run_sync_in_worker_thread(
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py",
 line 2470, in run_sync_in_worker_thread
       |     return await future
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py",
 line 967, in run
       |     result = context.run(func, *args)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/schema_generation.py",
 line 511, in __call__
       |     return self._original_callable(*args, **kwargs)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py",
 line 258, in ti_run
       |     upstream_map_indexes = dict(
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py",
 line 311, in _get_upstream_map_
   indexes
       |     mapped_ti_count = 
upstream_mapped_group._expand_input.get_total_map_length(
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/models/expandinput.py",
 line 110, in get_total_map_length
       |     lengths = self._get_map_lengths(run_id, session=session)
       |   File 
"/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/models/expandinput.py",
 line 104, in _get_map_lengths
       |     raise NotFullyPopulated(set(self.value).difference(map_lengths))
       | airflow.sdk.definitions._internal.expandinput.NotFullyPopulated: 
Failed to populate all mapping metadata; missing: 'my_num'
       +------------------------------------
   ```
   
   ### Operating System
   
   Red Hat Enterprise Linux 8.10 (Ootpa)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   This happens every time consistently. Initially I thought it would be fixed 
with 3.0.2 with [this PR](https://github.com/apache/airflow/pull/50641) by 
@Lee-W .
   
   Might also be related to https://github.com/apache/airflow/issues/51320 , 
that's recently merged by https://github.com/apache/airflow/pull/51701, which 
will be released in 3.0.3.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to