varaprasadregani opened a new issue, #59093:
URL: https://github.com/apache/airflow/issues/59093
### Apache Airflow version
3.1.3
### If "Other Airflow 2/3 version" selected, which one?
_No response_
### What happened?
When using XCom.get_value from the Task SDK / compat SDK inside an extra
link plugin, the Airflow webserver/API fails with:
`ImportError: cannot import name 'SUPERVISOR_COMMS' from
'airflow.sdk.execution_time.task_runner'`
This happens when the UI calls the /api/v2/.../links endpoint to resolve
extra links for a task instance.
The same plugin works fine without the XCom.get_value(...) call (links
render correctly). The problem only appears as soon as XCom.get_value is used
in the BaseOperatorLink.get_link() implementation.
This looks like a bug / unsupported usage path where the Task SDK assumes it
is running in a task runner context (with SUPERVISOR_COMMS) even though extra
links are resolved in the API server / webserver context.
### What you think should happen instead?
Using XCom.get_value(key="data_doc", ti_key=ti_key) inside
BaseOperatorLink.get_link() shouldn’t crash the /api/v2/.../links endpoint.
If there is no XCom row, I’d expect None or similar; the extra link should
either:
• return None/empty and be hidden, or
• return the URL string and render normally.
### How to reproduce
1. Add the plugin file above to $AIRFLOW_HOME/plugins (or the Astro
project plugins/ directory).
```from airflow.providers.common.compat.sdk import BaseOperator,
BaseOperatorLink, XCom
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.standard.operators.python import PythonOperator
class RerunTask(BaseOperatorLink):
name = "Rerun Task"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
return "https://www.astronomer.io/"
class PythonOperatorLink(BaseOperatorLink):
name = "GE Result"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
return 'https://google.com'
class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
RerunTask(),
]
global_operator_extra_links = [
PythonOperatorLink(),
RerunTask(),
]```
2. Restart Airflow.
3. Trigger the example_extra_links DAG once.
```
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
with DAG(
"example_extra_links",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
):
def my_task(**context):
context["ti"].xcom_push(key="data_doc",
value="https://example.com/ge_result")
python_task = PythonOperator(
task_id="python_task_traditional",
python_callable=my_task,
)
```
4. Go to Grid view → click python_task → open Details.
5. API Server logs show the SUPERVISOR_COMMS ImportError.
### Operating System
MacOS
### Versions of Apache Airflow Providers
_No response_
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else?
**Full Traceback**:
```INFO: 192.168.107.1:52298 - "GET
/api/v2/dags/example_extra_links/dagRuns/manual__2025-12-05T11%3A40%3A13%2B00%3A00/taskInstances/python_task_traditional/links?map_index=-1
HTTP/1.1" 500 Internal Server Error
ERROR: Exception in ASGI application
+ Exception Group Traceback (most recent call last):
| File "/usr/local/lib/python3.12/site-packages/starlette/_utils.py",
line 79, in collapse_excgroups
| yield
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line
183, in __call__
| async with anyio.create_task_group() as task_group:
| ^^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line
781, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File
"/usr/local/lib/python3.12/site-packages/uvicorn/protocols/http/httptools_impl.py",
line 409, in run_asgi
| result = await app( # type: ignore[func-returns-value]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/fastapi/applications.py", line 1082,
in __call__
| await super().__call__(scope, receive, send)
| File
"/usr/local/lib/python3.12/site-packages/starlette/applications.py", line 113,
in __call__
| await self.middleware_stack(scope, receive, send)
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line
186, in __call__
| raise exc
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/errors.py", line
164, in __call__
| await self.app(scope, receive, _send)
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line
29, in __call__
| await responder(scope, receive, send)
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line
130, in __call__
| await super().__call__(scope, receive, send)
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/gzip.py", line
46, in __call__
| await self.app(scope, receive, self.send_with_compression)
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/cors.py", line
85, in __call__
| await self.app(scope, receive, send)
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line
182, in __call__
| with recv_stream, send_stream, collapse_excgroups():
| ^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
| self.gen.throw(value)
| File "/usr/local/lib/python3.12/site-packages/starlette/_utils.py",
line 85, in collapse_excgroups
| raise exc
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line
184, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/astronomer/runtime/plugin.py", line
90, in dispatch
| response = await call_next(request)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line
159, in call_next
| raise app_exc
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/base.py", line
144, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File
"/usr/local/lib/python3.12/site-packages/starlette/middleware/exceptions.py",
line 63, in __call__
| await wrap_app_handling_exceptions(self.app, conn)(scope, receive,
send)
| File
"/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line
53, in wrapped_app
| raise exc
| File
"/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line
42, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py",
line 716, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py",
line 736, in app
| await route.handle(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py",
line 290, in handle
| await self.app(scope, receive, send)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py",
line 78, in app
| await wrap_app_handling_exceptions(app, request)(scope, receive,
send)
| File
"/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line
53, in wrapped_app
| raise exc
| File
"/usr/local/lib/python3.12/site-packages/starlette/_exception_handler.py", line
42, in wrapped_app
| await app(scope, receive, sender)
| File "/usr/local/lib/python3.12/site-packages/starlette/routing.py",
line 75, in app
| response = await f(request)
| ^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py",
line 308, in app
| raw_response = await run_endpoint_function(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/fastapi/routing.py",
line 221, in run_endpoint_function
| return await run_in_threadpool(dependant.call, **values)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/starlette/concurrency.py", line 38, in
run_in_threadpool
| return await anyio.to_thread.run_sync(func)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/lib/python3.12/site-packages/anyio/to_thread.py",
line 56, in run_sync
| return await get_async_backend().run_sync_in_worker_thread(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line
2485, in run_sync_in_worker_thread
| return await future
| ^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line
976, in run
| result = context.run(func, *args)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/extra_links.py",
line 81, in get_extra_links
| all_extra_links = {link_name: link_url or None for link_name,
link_url in sorted(all_extra_link_pairs)}
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/extra_links.py",
line 79, in <genexpr>
| (link_name, task.get_extra_links(ti, link_name)) for link_name in
task.extra_links
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py",
line 1407, in get_extra_links
| return link.get_link(self, ti_key=ti.key) # type: ignore[arg-type]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/usr/local/airflow/plugins/extra_link_plugin.py", line 21, in
get_link
| azure_url_code = XCom.get_value(key="data_doc", ti_key=ti_key)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", line 153,
in get_value
| return cls.get_one(
| ^^^^^^^^^^^^
| File
"/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/xcom.py", line 246,
in get_one
| from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
| ImportError: cannot import name 'SUPERVISOR_COMMS' from
'airflow.sdk.execution_time.task_runner'
(/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]