tirkarthi opened a new issue, #48786:
URL: https://github.com/apache/airflow/issues/48786
### Apache Airflow version
main (development)
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
Trying to run a dag with mapped task whose output serves as input to another
task causes the below stacktrace when the source mapping task is still
executing. Related issue #48546 got closed as not reproducible but it's
reproducible on main with below dag.
```
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py",
line 409, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py",
line 60, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/applications.py",
line 1054, in __call__
await super().__call__(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/applications.py",
line 112, in __call__
await self.middleware_stack(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
line 187, in __call__
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
line 165, in __call__
await self.app(scope, receive, _send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
line 29, in __call__
await responder(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
line 126, in __call__
await super().__call__(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
line 46, in __call__
await self.app(scope, receive, self.send_with_compression)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/cors.py",
line 85, in __call__
await self.app(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
line 173, in __call__
with recv_stream, send_stream, collapse_excgroups():
File "/usr/lib/python3.11/contextlib.py", line 158, in __exit__
self.gen.throw(typ, value, traceback)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_utils.py",
line 82, in collapse_excgroups
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
line 175, in __call__
response = await self.dispatch_func(request, call_next)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/middleware.py",
line 28, in dispatch
response = await call_next(request)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
line 153, in call_next
raise app_exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
line 140, in coro
await self.app(scope, receive_or_disconnect, send_no_error)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py",
line 62, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
line 53, in wrapped_app
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 714, in __call__
await self.middleware_stack(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 734, in app
await route.handle(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 288, in handle
await self.app(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 76, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
line 53, in wrapped_app
raise exc
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
line 42, in wrapped_app
await app(scope, receive, sender)
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
line 73, in app
response = await f(request)
^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
line 301, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
line 214, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/concurrency.py",
line 37, in run_in_threadpool
return await anyio.to_thread.run_sync(func)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/to_thread.py",
line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
line 2470, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
line 967, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py",
line 224, in grid_data
fill_task_instance_summaries(
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
line 250, in fill_task_instance_summaries
task_count=_get_total_task_count(run_id,
task_node_map[task_id]["task_count"], session),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
line 143, in _get_total_task_count
return sum(
^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
line 147, in <genexpr>
DBBaseOperator.get_mapped_ti_count(node, run_id=run_id, session=session)
or 0
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/functools.py", line 946, in _method
return method.__get__(obj, cls)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/baseoperator.py",
line 714, in _
current_count = exp_input.get_total_map_length(run_id, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/expandinput.py",
line 129, in get_total_map_length
raise NotFullyPopulated({"expand_kwargs() argument"})
airflow.sdk.definitions._internal.expandinput.NotFullyPopulated: Failed to
populate all mapping metadata; missing: 'expand_kwargs() argument'
```
### What you think should happen instead?
_No response_
### How to reproduce
Run the below dag and try to load the UI with `data` task running.
```python
from __future__ import annotations
import datetime
from airflow.decorators import task
from airflow.sdk.definitions.dag import DAG
with DAG(
dag_id="gh48546",
schedule=datetime.timedelta(days=30 * 365),
start_date=datetime.datetime(1970, 1, 1),
tags=["taskmap"],
) as expand_kwargs_mod_tf:
@task
def data():
import time
time.sleep(1000)
return [
("hello {}", "foo"),
("goodbye {}", "bar"),
]
def mapper(entry):
return entry
@task(task_id="one_task")
def printer(template, user):
print(template.format(user))
printer.expand_kwargs(data().map(mapper))
```
### Operating System
Ubuntu 20.04
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]