tirkarthi commented on issue #48546:
URL: https://github.com/apache/airflow/issues/48546#issuecomment-2775753985

   @dstandish I noticed a very similar traceback similar to this issue. I guess 
from the exception `data()` is still not ready while being mapped and applied 
to `expand_kwargs` . The sleep delay might be making this more reliable in 
reproducing the issue.
   
   ```python
   from __future__ import annotations
   
   import datetime
   
   from airflow.decorators import task
   from airflow.sdk.definitions.dag import DAG
   
   with DAG(
       dag_id="gh48546",
       schedule=datetime.timedelta(days=30 * 365),
       start_date=datetime.datetime(1970, 1, 1),
       tags=["taskmap"],
   ) as expand_kwargs_mod_tf:
   
       @task
       def data():
           import time
   
           time.sleep(1000)
   
           return [
               ("hello {}", "foo"),
               ("goodbye {}", "bar"),
           ]
   
       def mapper(entry):
           return entry
   
       @task(task_id="one_task")
       def printer(template, user):
           print(template.format(user))
   
       printer.expand_kwargs(data().map(mapper))
   ```
   
   ```
   
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py",
 line 409, in run_asgi
       result = await app(  # type: ignore[func-returns-value]
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py",
 line 60, in __call__
       return await self.app(scope, receive, send)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/applications.py",
 line 1054, in __call__
       await super().__call__(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/applications.py",
 line 112, in __call__
       await self.middleware_stack(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
 line 187, in __call__
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py",
 line 165, in __call__
       await self.app(scope, receive, _send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 29, in __call__
       await responder(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 126, in __call__
       await super().__call__(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/gzip.py",
 line 46, in __call__
       await self.app(scope, receive, self.send_with_compression)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/cors.py",
 line 85, in __call__
       await self.app(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 173, in __call__
       with recv_stream, send_stream, collapse_excgroups():
     File "/usr/lib/python3.11/contextlib.py", line 158, in __exit__
       self.gen.throw(typ, value, traceback)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_utils.py",
 line 82, in collapse_excgroups
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 175, in __call__
       response = await self.dispatch_func(request, call_next)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/middleware.py",
 line 28, in dispatch
       response = await call_next(request)
                  ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 153, in call_next
       raise app_exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/base.py",
 line 140, in coro
       await self.app(scope, receive_or_disconnect, send_no_error)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py",
 line 62, in __call__
       await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       await app(scope, receive, sender)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 714, in __call__
       await self.middleware_stack(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 734, in app
       await route.handle(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 288, in handle
       await self.app(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 76, in app
       await wrap_app_handling_exceptions(app, request)(scope, receive, send)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 53, in wrapped_app
       raise exc
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py",
 line 42, in wrapped_app
       await app(scope, receive, sender)
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/routing.py",
 line 73, in app
       response = await f(request)
                  ^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
 line 301, in app
       raw_response = await run_endpoint_function(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/fastapi/routing.py",
 line 214, in run_endpoint_function
       return await run_in_threadpool(dependant.call, **values)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/starlette/concurrency.py",
 line 37, in run_in_threadpool
       return await anyio.to_thread.run_sync(func)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/to_thread.py",
 line 56, in run_sync
       return await get_async_backend().run_sync_in_worker_thread(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
 line 2470, in run_sync_in_worker_thread
       return await future
              ^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py",
 line 967, in run
       result = context.run(func, *args)
                ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py",
 line 210, in grid_data
       fill_task_instance_summaries(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
 line 239, in fill_task_instance_summaries
       task_count=_get_total_task_count(run_id, 
task_node_map[task_id]["task_count"], session),
                  
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
 line 139, in _get_total_task_count
       return sum(
              ^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py",
 line 143, in <genexpr>
       DBBaseOperator.get_mapped_ti_count(node, run_id=run_id, session=session) 
or 0
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/functools.py", line 946, in _method
       return method.__get__(obj, cls)(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/baseoperator.py",
 line 714, in _
       current_count = exp_input.get_total_map_length(run_id, session=session)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/models/expandinput.py",
 line 129, in get_total_map_length
       raise NotFullyPopulated({"expand_kwargs() argument"})
   airflow.sdk.definitions._internal.expandinput.NotFullyPopulated: Failed to 
populate all mapping metadata; missing: 'expand_kwargs() argument'
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to