chmnata opened a new issue, #52730: URL: https://github.com/apache/airflow/issues/52730
### Apache Airflow version 3.0.2 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? When there are skipped dynamically mapped taskgroup with a downstream task with `trigger_rule = all done`, the downstream tasks does not run and will get stuck in a queued state with `state match` with no task logs. ``` Executor LocalExecutor(parallelism=64) reported that the task instance <TaskInstance: task_state_changed_externally.any_task manual__2025-07-02T17:26:56.340268+00:00 [queued]> finished with state failed, but the task instance's state attribute is queued. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally ``` ### What you think should happen instead? skipped upstream task should be considered "executed" therefore the downstream task with `trigger_rule = all done` should still run. ### How to reproduce A modified version from [this closed issue](https://github.com/apache/airflow/issues/51457), which resulted in the same error and were fixed in 3.0.2. However the same error persist with skipped upstream task and trigger_rule = all_done downstream task. ``` from airflow.sdk import dag, task_group, task from datetime import datetime from airflow.exceptions import AirflowSkipException, AirflowFailException @dag(start_date=datetime(2025, 6, 28), schedule=None, catchup=False) def task_state_changed_externally(): @task def get_nums(): raise AirflowSkipException("Skipping this one intentionally") return [1, 2] # creating a task group using the decorator with the dynamic input my_num @task_group(group_id="group1") def tg1(my_num): @task def print_num(num): raise AirflowSkipException("Skipping this one intentionally") return num @task def add_42(num): if num == 2: raise AirflowSkipException("Skipping this one intentionally") return num + 42 print_num(my_num) >> add_42(my_num) # a task downstream of dynamic task group @task(trigger_rule="all_done") def any_task(): print("Test") nums = get_nums() dynamic_tg = tg1.expand(my_num = nums) #a dynamic task group mapped over an xcom dynamic_tg >> any_task() #any task downstream of a mapped task task_state_changed_externally() ``` This is the error in api-server.log, ``` INFO: 127.0.0.1:54114 - "PATCH /airflow/execution/task-instances/0197cc2d-6471-7abe-b0db-f13c0242cdd9/run HTTP/1.1" 500 Internal Server Error ERROR: Exception in ASGI application + Exception Group Traceback (most recent call last): | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_utils.py", line 76, in collapse_excgroups | yield | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 181, in __call__ | recv_stream.close() | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__ | raise BaseExceptionGroup( | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Traceback (most recent call last): | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi | result = await app( # type: ignore[func-returns-value] | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__ | return await self.app(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__ | await super().__call__(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/applications.py", line 112, in __call__ | await self.middleware_stack(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py", line 187, in __call__ | raise exc | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py", line 165, in __call__ | await self.app(scope, receive, _send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py", line 29, in __call__ | await responder(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py", line 126, in __call__ | await super().__call__(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py", line 46, in __call__ | await self.app(scope, receive, self.send_with_compression) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/cors.py", line 85, in __call__ | await self.app(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 183, in __call__ | raise app_exc | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 141, in coro | await self.app(scope, receive_or_disconnect, send_no_error) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/exceptions.py", line 62, in __call__ | await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app | raise exc | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app | await app(scope, receive, sender) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 714, in __call__ | await self.middleware_stack(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 734, in app | await route.handle(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 460, in handle | await self.app(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/applications.py", line 259, in __call__ | await self.__call__(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__ | await super().__call__(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/applications.py", line 112, in __call__ | await self.middleware_stack(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py", line 187, in __call__ | raise exc | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py", line 165, in __call__ | await self.app(scope, receive, _send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 181, in __call__ | recv_stream.close() | File "/usr/lib64/python3.9/contextlib.py", line 137, in __exit__ | self.gen.throw(typ, value, traceback) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_utils.py", line 82, in collapse_excgroups | raise exc | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 178, in __call__ | response = await self.dispatch_func(request, call_next) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/middleware.py", line 124, in dispatch | response = await call_next(request) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 156, in call_next | raise app_exc | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 141, in coro | await self.app(scope, receive_or_disconnect, send_no_error) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/exceptions.py", line 62, in __call__ | await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app | raise exc | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app | await app(scope, receive, sender) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/routing.py", line 93, in __call__ | await self.process_request(scope=scope, receive=receive, send=send, routes=routes) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/routing.py", line 131, in process_request | await route.handle(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 288, in handle | await self.app(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 76, in app | await wrap_app_handling_exceptions(app, request)(scope, receive, send) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app | raise exc | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app | await app(scope, receive, sender) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 73, in app | response = await f(request) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/routing.py", line 301, in app | raw_response = await run_endpoint_function( | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/routing.py", line 212, in run_endpoint_function | return await dependant.call(**values) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/structure/versions.py", line 474, in decorator | response = await self._convert_endpoint_response_to_version( | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/structure/versions.py", line 520, in _convert_endpoint_response_to_version | response_or_response_body: Union[FastapiResponse, object] = await run_in_threadpool( | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/concurrency.py", line 37, in run_in_threadpool | return await anyio.to_thread.run_sync(func) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/to_thread.py", line 56, in run_sync | return await get_async_backend().run_sync_in_worker_thread( | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py", line 2470, in run_sync_in_worker_thread | return await future | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py", line 967, in run | result = context.run(func, *args) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/schema_generation.py", line 511, in __call__ | return self._original_callable(*args, **kwargs) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py", line 258, in ti_run | upstream_map_indexes = dict( | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py", line 311, in _get_upstream_map_ indexes | mapped_ti_count = upstream_mapped_group._expand_input.get_total_map_length( | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/models/expandinput.py", line 110, in get_total_map_length | lengths = self._get_map_lengths(run_id, session=session) | File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/models/expandinput.py", line 104, in _get_map_lengths | raise NotFullyPopulated(set(self.value).difference(map_lengths)) | airflow.sdk.definitions._internal.expandinput.NotFullyPopulated: Failed to populate all mapping metadata; missing: 'my_num' +------------------------------------ ``` ### Operating System Red Hat Enterprise Linux 8.10 (Ootpa) ### Versions of Apache Airflow Providers _No response_ ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else? This happens every time consistently. Initially I thought it would be fixed with 3.0.2 with [this PR](https://github.com/apache/airflow/pull/50641) by @Lee-W . Might also be related to https://github.com/apache/airflow/issues/51320 , that's recently merged by https://github.com/apache/airflow/pull/51701, which will be released in 3.0.3. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
