tirkarthi opened a new issue, #48786:
URL: https://github.com/apache/airflow/issues/48786

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Trying to run a dag with mapped task whose output serves as input to another 
task causes the below stacktrace when the source mapping task is still 
executing. Related issue #48546 got closed as not reproducible but it's 
reproducible on main with below dag.
   
   
   ```
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py",
 line 409, in run_asgi
       result = await app(  # type: ignore[func-returns-value]
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py",
 line 60, in __call__
       return await self.app(scope, receive, send)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/applications.py",
 line 1054, in __call__
       await super().__call__(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/applications.py",
 line 112, in __call__
       await self.middleware_stack(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
 line 187, in __call__
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
 line 165, in __call__
       await self.app(scope, receive, _send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 29, in __call__
       await responder(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 126, in __call__
       await super().__call__(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 46, in __call__
       await self.app(scope, receive, self.send_with_compression)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/cors.py",
 line 85, in __call__
       await self.app(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 173, in __call__
       with recv_stream, send_stream, collapse_excgroups():
     File "/usr/lib/python3.11/contextlib.py", line 158, in __exit__
       self.gen.throw(typ, value, traceback)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_utils.py",
 line 82, in collapse_excgroups
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 175, in __call__
       response = await self.dispatch_func(request, call_next)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/middleware.py",
 line 28, in dispatch
       response = await call_next(request)
                  ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 153, in call_next
       raise app_exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 140, in coro
       await self.app(scope, receive_or_disconnect, send_no_error)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py",
 line 62, in __call__
       await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       await app(scope, receive, sender)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 714, in __call__
       await self.middleware_stack(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 734, in app
       await route.handle(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 288, in handle
       await self.app(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 76, in app
       await wrap_app_handling_exceptions(app, request)(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       await app(scope, receive, sender)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 73, in app
       response = await f(request)
                  ^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
 line 301, in app
       raw_response = await run_endpoint_function(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
 line 214, in run_endpoint_function
       return await run_in_threadpool(dependant.call, **values)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/concurrency.py",
 line 37, in run_in_threadpool
       return await anyio.to_thread.run_sync(func)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/to_thread.py",
 line 56, in run_sync
       return await get_async_backend().run_sync_in_worker_thread(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
 line 2470, in run_sync_in_worker_thread
       return await future
              ^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
 line 967, in run
       result = context.run(func, *args)
                ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py",
 line 224, in grid_data
       fill_task_instance_summaries(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
 line 250, in fill_task_instance_summaries
       task_count=_get_total_task_count(run_id, 
task_node_map[task_id]["task_count"], session),
                  
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
 line 143, in _get_total_task_count
       return sum(
              ^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
 line 147, in <genexpr>
       DBBaseOperator.get_mapped_ti_count(node, run_id=run_id, session=session) 
or 0
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/functools.py", line 946, in _method
       return method.__get__(obj, cls)(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/baseoperator.py",
 line 714, in _
       current_count = exp_input.get_total_map_length(run_id, session=session)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/expandinput.py",
 line 129, in get_total_map_length
       raise NotFullyPopulated({"expand_kwargs() argument"})
   airflow.sdk.definitions._internal.expandinput.NotFullyPopulated: Failed to 
populate all mapping metadata; missing: 'expand_kwargs() argument'
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Run the below dag and try to load the UI with `data` task running.
   
   ```python
   from __future__ import annotations
   
   import datetime
   
   from airflow.decorators import task
   from airflow.sdk.definitions.dag import DAG
   
   with DAG(
       dag_id="gh48546",
       schedule=datetime.timedelta(days=30 * 365),
       start_date=datetime.datetime(1970, 1, 1),
       tags=["taskmap"],
   ) as expand_kwargs_mod_tf:
   
       @task
       def data():
           import time
   
           time.sleep(1000)
   
           return [
               ("hello {}", "foo"),
               ("goodbye {}", "bar"),
           ]
   
       def mapper(entry):
           return entry
   
       @task(task_id="one_task")
       def printer(template, user):
           print(template.format(user))
   
       printer.expand_kwargs(data().map(mapper))
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to