dheerajturaga opened a new pull request, #58922:
URL: https://github.com/apache/airflow/pull/58922
When clearing task instances with include_downstream or include_upstream
enabled,
the clear operation would fail with a NotMapped exception if any task was
specified
with a map_index tuple but wasn't actually a mapped task. This happened
because the
code assumed all tasks specified as tuples were mapped tasks and called
get_mapped_ti_count() on them without handling the NotMapped exception.
The fix catches the NotMapped exception and treats such tasks as normal
tasks instead.
related to: #57758
Error seen in api-server when I try to clear task instances
```
INFO: 172.18.0.1:48544 - "POST
/api/v2/dags/example_bash_decorator/clearTaskInstances HTTP/1.1" 500 Internal
Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File
"/usr/python/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py",
line 409, in run_asgi
result = await app( # type: ignore[func-returns-value]
File "/usr/python/lib/python3.10/site-packages/fastapi/applications.py",
line 1134, in __call__
await super().__call__(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/starlette/applications.py",
line 107, in __call__
await self.middleware_stack(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line
186, in __call__
raise exc
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line
164, in __call__
await self.app(scope, receive, _send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line
29, in __call__
await responder(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line
130, in __call__
await super().__call__(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line
46, in __call__
await self.app(scope, receive, self.send_with_compression)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line
93, in __call__
await self.simple_response(scope, receive, send, request_headers=headers)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line
144, in simple_response
await self.app(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line
191, in __call__
with recv_stream, send_stream, collapse_excgroups():
File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/python/lib/python3.10/site-packages/starlette/_utils.py", line
85, in collapse_excgroups
raise exc
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line
193, in __call__
response = await self.dispatch_func(request, call_next)
File
"/opt/airflow/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py",
line 51, in dispatch
response = await call_next(request)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line
168, in call_next
raise app_exc from app_exc.__cause__ or app_exc.__context__
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line
144, in coro
await self.app(scope, receive_or_disconnect, send_no_error)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/exceptions.py",
line 63, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py",
line 53, in wrapped_app
raise exc
File
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File
"/usr/python/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py",
line 18, in __call__
await self.app(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line
716, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line
736, in app
await route.handle(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line
290, in handle
await self.app(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line
125, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py",
line 53, in wrapped_app
raise exc
File
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line
111, in app
response = await f(request)
File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line
391, in app
raw_response = await run_endpoint_function(
File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line
292, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
File "/usr/python/lib/python3.10/site-packages/starlette/concurrency.py",
line 32, in run_in_threadpool
return await anyio.to_thread.run_sync(func)
File "/usr/python/lib/python3.10/site-packages/anyio/to_thread.py", line
56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
File
"/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line
2485, in run_sync_in_worker_thread
return await future
File
"/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line
976, in run
result = context.run(func, *args)
File
"/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py",
line 800, in post_clear_task_instances
_collect_relatives(dag_run_id, "downstream")
File
"/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py",
line 771, in _collect_relatives
relevant_relatives = find_relevant_relatives(
File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line
2346, in find_relevant_relatives
_visit_relevant_relatives_for_mapped(mapped_tasks)
File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line
2315, in _visit_relevant_relatives_for_mapped
ti_count = get_mapped_ti_count(task, run_id, session=session)
File "/usr/python/lib/python3.10/functools.py", line 889, in wrapper
return dispatch(args[0].__class__)(*args, **kw)
File "/opt/airflow/airflow-core/src/airflow/models/mappedoperator.py",
line 517, in _
raise NotMapped()
airflow.exceptions.NotMapped
INFO: 172.18.0.1:48556 - "POST
/api/v2/dags/example_bash_decorator/clearTaskInstances HTTP/1.1" 500 Internal
Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File
"/usr/python/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py",
line 409, in run_asgi
result = await app( # type: ignore[func-returns-value]
File "/usr/python/lib/python3.10/site-packages/fastapi/applications.py",
line 1134, in __call__
await super().__call__(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/starlette/applications.py",
line 107, in __call__
await self.middleware_stack(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line
186, in __call__
raise exc
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line
164, in __call__
await self.app(scope, receive, _send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line
29, in __call__
await responder(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line
130, in __call__
await super().__call__(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line
46, in __call__
await self.app(scope, receive, self.send_with_compression)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line
93, in __call__
await self.simple_response(scope, receive, send, request_headers=headers)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line
144, in simple_response
await self.app(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line
191, in __call__
with recv_stream, send_stream, collapse_excgroups():
File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/python/lib/python3.10/site-packages/starlette/_utils.py", line
85, in collapse_excgroups
raise exc
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line
193, in __call__
response = await self.dispatch_func(request, call_next)
File
"/opt/airflow/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py",
line 51, in dispatch
response = await call_next(request)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line
168, in call_next
raise app_exc from app_exc.__cause__ or app_exc.__context__
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line
144, in coro
await self.app(scope, receive_or_disconnect, send_no_error)
File
"/usr/python/lib/python3.10/site-packages/starlette/middleware/exceptions.py",
line 63, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py",
line 53, in wrapped_app
raise exc
File
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File
"/usr/python/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py",
line 18, in __call__
await self.app(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line
716, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line
736, in app
await route.handle(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line
290, in handle
await self.app(scope, receive, send)
File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line
125, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py",
line 53, in wrapped_app
raise exc
File
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line
111, in app
response = await f(request)
File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line
391, in app
raw_response = await run_endpoint_function(
File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line
292, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
File "/usr/python/lib/python3.10/site-packages/starlette/concurrency.py",
line 32, in run_in_threadpool
return await anyio.to_thread.run_sync(func)
File "/usr/python/lib/python3.10/site-packages/anyio/to_thread.py", line
56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
File
"/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line
2485, in run_sync_in_worker_thread
return await future
File
"/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line
976, in run
result = context.run(func, *args)
File
"/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py",
line 800, in post_clear_task_instances
_collect_relatives(dag_run_id, "downstream")
File
"/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py",
line 771, in _collect_relatives
relevant_relatives = find_relevant_relatives(
File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line
2346, in find_relevant_relatives
_visit_relevant_relatives_for_mapped(mapped_tasks)
File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line
2315, in _visit_relevant_relatives_for_mapped
ti_count = get_mapped_ti_count(task, run_id, session=session)
File "/usr/python/lib/python3.10/functools.py", line 889, in wrapper
return dispatch(args[0].__class__)(*args, **kw)
File "/opt/airflow/airflow-core/src/airflow/models/mappedoperator.py",
line 517, in _
raise NotMapped()
airflow.exceptions.NotMapped
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]