dheerajturaga opened a new pull request, #58922:
URL: https://github.com/apache/airflow/pull/58922

   When clearing task instances with include_downstream or include_upstream 
enabled,
   the clear operation would fail with a NotMapped exception if any task was 
specified
   with a map_index tuple but wasn't actually a mapped task. This happened 
because the
   code assumed all tasks specified as tuples were mapped tasks and called
   get_mapped_ti_count() on them without handling the NotMapped exception.
   
   The fix catches the NotMapped exception and treats such tasks as normal 
tasks instead.
   
   related to: #57758
   
   Error seen in api-server when I try to clear task instances
   
   ```
   INFO:     172.18.0.1:48544 - "POST 
/api/v2/dags/example_bash_decorator/clearTaskInstances HTTP/1.1" 500 Internal 
Server Error
   ERROR:    Exception in ASGI application
   Traceback (most recent call last):
     File 
"/usr/python/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py",
 line 409, in run_asgi
       result = await app(  # type: ignore[func-returns-value]
     File "/usr/python/lib/python3.10/site-packages/fastapi/applications.py", 
line 1134, in __call__
       await super().__call__(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/starlette/applications.py", 
line 107, in __call__
       await self.middleware_stack(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 
186, in __call__
       raise exc
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 
164, in __call__
       await self.app(scope, receive, _send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 
29, in __call__
       await responder(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 
130, in __call__
       await super().__call__(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 
46, in __call__
       await self.app(scope, receive, self.send_with_compression)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line 
93, in __call__
       await self.simple_response(scope, receive, send, request_headers=headers)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line 
144, in simple_response
       await self.app(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 
191, in __call__
       with recv_stream, send_stream, collapse_excgroups():
     File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__
       self.gen.throw(typ, value, traceback)
     File "/usr/python/lib/python3.10/site-packages/starlette/_utils.py", line 
85, in collapse_excgroups
       raise exc
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 
193, in __call__
       response = await self.dispatch_func(request, call_next)
     File 
"/opt/airflow/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py",
 line 51, in dispatch
       response = await call_next(request)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 
168, in call_next
       raise app_exc from app_exc.__cause__ or app_exc.__context__
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 
144, in coro
       await self.app(scope, receive_or_disconnect, send_no_error)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/exceptions.py", 
line 63, in __call__
       await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", 
line 53, in wrapped_app
       raise exc
     File 
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", 
line 42, in wrapped_app
       await app(scope, receive, sender)
     File 
"/usr/python/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py",
 line 18, in __call__
       await self.app(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 
716, in __call__
       await self.middleware_stack(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 
736, in app
       await route.handle(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 
290, in handle
       await self.app(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 
125, in app
       await wrap_app_handling_exceptions(app, request)(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", 
line 53, in wrapped_app
       raise exc
     File 
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", 
line 42, in wrapped_app
       await app(scope, receive, sender)
     File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 
111, in app
       response = await f(request)
     File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 
391, in app
       raw_response = await run_endpoint_function(
     File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 
292, in run_endpoint_function
       return await run_in_threadpool(dependant.call, **values)
     File "/usr/python/lib/python3.10/site-packages/starlette/concurrency.py", 
line 32, in run_in_threadpool
       return await anyio.to_thread.run_sync(func)
     File "/usr/python/lib/python3.10/site-packages/anyio/to_thread.py", line 
56, in run_sync
       return await get_async_backend().run_sync_in_worker_thread(
     File 
"/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 
2485, in run_sync_in_worker_thread
       return await future
     File 
"/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 
976, in run
       result = context.run(func, *args)
     File 
"/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py",
 line 800, in post_clear_task_instances
       _collect_relatives(dag_run_id, "downstream")
     File 
"/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py",
 line 771, in _collect_relatives
       relevant_relatives = find_relevant_relatives(
     File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line 
2346, in find_relevant_relatives
       _visit_relevant_relatives_for_mapped(mapped_tasks)
     File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line 
2315, in _visit_relevant_relatives_for_mapped
       ti_count = get_mapped_ti_count(task, run_id, session=session)
     File "/usr/python/lib/python3.10/functools.py", line 889, in wrapper
       return dispatch(args[0].__class__)(*args, **kw)
     File "/opt/airflow/airflow-core/src/airflow/models/mappedoperator.py", 
line 517, in _
       raise NotMapped()
   airflow.exceptions.NotMapped
   INFO:     172.18.0.1:48556 - "POST 
/api/v2/dags/example_bash_decorator/clearTaskInstances HTTP/1.1" 500 Internal 
Server Error
   ERROR:    Exception in ASGI application
   Traceback (most recent call last):
     File 
"/usr/python/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py",
 line 409, in run_asgi
       result = await app(  # type: ignore[func-returns-value]
     File "/usr/python/lib/python3.10/site-packages/fastapi/applications.py", 
line 1134, in __call__
       await super().__call__(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/starlette/applications.py", 
line 107, in __call__
       await self.middleware_stack(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 
186, in __call__
       raise exc
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 
164, in __call__
       await self.app(scope, receive, _send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 
29, in __call__
       await responder(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 
130, in __call__
       await super().__call__(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 
46, in __call__
       await self.app(scope, receive, self.send_with_compression)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line 
93, in __call__
       await self.simple_response(scope, receive, send, request_headers=headers)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line 
144, in simple_response
       await self.app(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 
191, in __call__
       with recv_stream, send_stream, collapse_excgroups():
     File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__
       self.gen.throw(typ, value, traceback)
     File "/usr/python/lib/python3.10/site-packages/starlette/_utils.py", line 
85, in collapse_excgroups
       raise exc
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 
193, in __call__
       response = await self.dispatch_func(request, call_next)
     File 
"/opt/airflow/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py",
 line 51, in dispatch
       response = await call_next(request)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 
168, in call_next
       raise app_exc from app_exc.__cause__ or app_exc.__context__
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 
144, in coro
       await self.app(scope, receive_or_disconnect, send_no_error)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/middleware/exceptions.py", 
line 63, in __call__
       await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", 
line 53, in wrapped_app
       raise exc
     File 
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", 
line 42, in wrapped_app
       await app(scope, receive, sender)
     File 
"/usr/python/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py",
 line 18, in __call__
       await self.app(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 
716, in __call__
       await self.middleware_stack(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 
736, in app
       await route.handle(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 
290, in handle
       await self.app(scope, receive, send)
     File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 
125, in app
       await wrap_app_handling_exceptions(app, request)(scope, receive, send)
     File 
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", 
line 53, in wrapped_app
       raise exc
     File 
"/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", 
line 42, in wrapped_app
       await app(scope, receive, sender)
     File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 
111, in app
       response = await f(request)
     File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 
391, in app
       raw_response = await run_endpoint_function(
     File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 
292, in run_endpoint_function
       return await run_in_threadpool(dependant.call, **values)
     File "/usr/python/lib/python3.10/site-packages/starlette/concurrency.py", 
line 32, in run_in_threadpool
       return await anyio.to_thread.run_sync(func)
     File "/usr/python/lib/python3.10/site-packages/anyio/to_thread.py", line 
56, in run_sync
       return await get_async_backend().run_sync_in_worker_thread(
     File 
"/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 
2485, in run_sync_in_worker_thread
       return await future
     File 
"/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 
976, in run
       result = context.run(func, *args)
     File 
"/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py",
 line 800, in post_clear_task_instances
       _collect_relatives(dag_run_id, "downstream")
     File 
"/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py",
 line 771, in _collect_relatives
       relevant_relatives = find_relevant_relatives(
     File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line 
2346, in find_relevant_relatives
       _visit_relevant_relatives_for_mapped(mapped_tasks)
     File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line 
2315, in _visit_relevant_relatives_for_mapped
       ti_count = get_mapped_ti_count(task, run_id, session=session)
     File "/usr/python/lib/python3.10/functools.py", line 889, in wrapper
       return dispatch(args[0].__class__)(*args, **kw)
     File "/opt/airflow/airflow-core/src/airflow/models/mappedoperator.py", 
line 517, in _
       raise NotMapped()
   airflow.exceptions.NotMapped
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to