varaprasadregani opened a new issue, #59093:
URL: https://github.com/apache/airflow/issues/59093

   ### Apache Airflow version
   
   3.1.3
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When using XCom.get_value from the Task SDK / compat SDK inside an extra 
link plugin, the Airflow webserver/API fails with:
   `ImportError: cannot import name 'SUPERVISOR_COMMS' from 
'airflow.sdk.execution_time.task_runner'`
   
   This happens when the UI calls the /api/v2/.../links endpoint to resolve 
extra links for a task instance.
   
   The same plugin works fine without the XCom.get_value(...) call (links 
render correctly). The problem only appears as soon as XCom.get_value is used 
in the BaseOperatorLink.get_link() implementation.
   
   This looks like a bug / unsupported usage path where the Task SDK assumes it 
is running in a task runner context (with SUPERVISOR_COMMS) even though extra 
links are resolved in the API server / webserver context.
   
   ### What you think should happen instead?
   
        Using XCom.get_value(key="data_doc", ti_key=ti_key) inside 
BaseOperatorLink.get_link() shouldn’t crash the /api/v2/.../links endpoint.
   If there is no XCom row, I’d expect None or similar; the extra link should 
either:
        •       return None/empty and be hidden, or
        •       return the URL string and render normally.
   
   ### How to reproduce
   
   1.   Add the plugin file above to $AIRFLOW_HOME/plugins (or the Astro 
project plugins/ directory).
   ```from airflow.providers.common.compat.sdk import BaseOperator, 
BaseOperatorLink, XCom
   
   from airflow.models.taskinstancekey import TaskInstanceKey
   from airflow.plugins_manager import AirflowPlugin
   
   from airflow.providers.standard.operators.python import PythonOperator
   
   
   class RerunTask(BaseOperatorLink):
       name = "Rerun Task"
   
       def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
           azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
           return "https://www.astronomer.io/";
   
   
   class PythonOperatorLink(BaseOperatorLink):
       name = "GE Result"
   
       def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
           azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
           return 'https://google.com'
   
   class AirflowExtraLinkPlugin(AirflowPlugin):
       name = "extra_link_plugin"
   
       operator_extra_links = [
           RerunTask(),
       ]
   
       global_operator_extra_links = [
           PythonOperatorLink(),
           RerunTask(),
       ]```
   2.   Restart Airflow.
   3.   Trigger the example_extra_links DAG once.
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   from datetime import datetime
   
   with DAG(
       "example_extra_links",
       start_date=datetime(2025, 1, 1),
       schedule=None,
       catchup=False,
   ):
       def my_task(**context):
           context["ti"].xcom_push(key="data_doc", 
value="https://example.com/ge_result";)
   
       python_task = PythonOperator(
           task_id="python_task_traditional",
           python_callable=my_task,
       )
   ```
   4.   Go to Grid view → click python_task → open Details.
   5.   API Server logs show the SUPERVISOR_COMMS ImportError.
   
   ### Operating System
   
   MacOS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   **Full Traceback**:
   ```INFO:     192.168.107.1:52298 - "GET 
/api/v2/dags/example_extra_links/dagRuns/manual__2025-12-05T11%3A40%3A13%2B00%3A00/taskInstances/python_task_traditional/links?map_index=-1
 HTTP/1.1" 500 Internal Server Error
   ERROR:    Exception in ASGI application
     + Exception Group Traceback (most recent call last):
     |   File "/usr/local/lib/python3.12/site-packages/starlette/_utils.py", 
line 79, in collapse_excgroups
     |     yield
     |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 
183, in __call__
     |     async with anyio.create_task_group() as task_group:
     |                ^^^^^^^^^^^^^^^^^^^^^^^^^
     |   File 
"/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 
781, in __aexit__
     |     raise BaseExceptionGroup(
     | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
     +-+---------------- 1 ----------------
       | Traceback (most recent call last):
       |   File 
"/usr/local/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py",
 line 409, in run_asgi
       |     result = await app(  # type: ignore[func-returns-value]
       |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/fastapi/applications.py", line 1082, 
in __call__
       |     await super().__call__(scope, receive, send)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/applications.py", line 113, 
in __call__
       |     await self.middleware_stack(scope, receive, send)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 
186, in __call__
       |     raise exc
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line 
164, in __call__
       |     await self.app(scope, receive, _send)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 
29, in __call__
       |     await responder(scope, receive, send)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 
130, in __call__
       |     await super().__call__(scope, receive, send)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line 
46, in __call__
       |     await self.app(scope, receive, self.send_with_compression)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/cors.py", line 
85, in __call__
       |     await self.app(scope, receive, send)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 
182, in __call__
       |     with recv_stream, send_stream, collapse_excgroups():
       |                                    ^^^^^^^^^^^^^^^^^^^^
       |   File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
       |     self.gen.throw(value)
       |   File "/usr/local/lib/python3.12/site-packages/starlette/_utils.py", 
line 85, in collapse_excgroups
       |     raise exc
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 
184, in __call__
       |     response = await self.dispatch_func(request, call_next)
       |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/astronomer/runtime/plugin.py", line 
90, in dispatch
       |     response = await call_next(request)
       |                ^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 
159, in call_next
       |     raise app_exc
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line 
144, in coro
       |     await self.app(scope, receive_or_disconnect, send_no_error)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/middleware/exceptions.py", 
line 63, in __call__
       |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, 
send)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 
53, in wrapped_app
       |     raise exc
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 
42, in wrapped_app
       |     await app(scope, receive, sender)
       |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", 
line 716, in __call__
       |     await self.middleware_stack(scope, receive, send)
       |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", 
line 736, in app
       |     await route.handle(scope, receive, send)
       |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", 
line 290, in handle
       |     await self.app(scope, receive, send)
       |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", 
line 78, in app
       |     await wrap_app_handling_exceptions(app, request)(scope, receive, 
send)
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 
53, in wrapped_app
       |     raise exc
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 
42, in wrapped_app
       |     await app(scope, receive, sender)
       |   File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", 
line 75, in app
       |     response = await f(request)
       |                ^^^^^^^^^^^^^^^^
       |   File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py", 
line 308, in app
       |     raw_response = await run_endpoint_function(
       |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py", 
line 221, in run_endpoint_function
       |     return await run_in_threadpool(dependant.call, **values)
       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/starlette/concurrency.py", line 38, in 
run_in_threadpool
       |     return await anyio.to_thread.run_sync(func)
       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File "/usr/local/lib/python3.12/site-packages/anyio/to_thread.py", 
line 56, in run_sync
       |     return await get_async_backend().run_sync_in_worker_thread(
       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 
2485, in run_sync_in_worker_thread
       |     return await future
       |            ^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 
976, in run
       |     result = context.run(func, *args)
       |              ^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/extra_links.py",
 line 81, in get_extra_links
       |     all_extra_links = {link_name: link_url or None for link_name, 
link_url in sorted(all_extra_link_pairs)}
       |                                                                        
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/extra_links.py",
 line 79, in <genexpr>
       |     (link_name, task.get_extra_links(ti, link_name)) for link_name in 
task.extra_links
       |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
 line 1407, in get_extra_links
       |     return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
       |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File "/usr/local/airflow/plugins/extra_link_plugin.py", line 21, in 
get_link
       |     azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
       |                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", line 153, 
in get_value
       |     return cls.get_one(
       |            ^^^^^^^^^^^^
       |   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", line 246, 
in get_one
       |     from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
       | ImportError: cannot import name 'SUPERVISOR_COMMS' from 
'airflow.sdk.execution_time.task_runner' 
(/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to