amoghrajesh opened a new issue, #45341:
URL: https://github.com/apache/airflow/issues/45341
### Body
Example DAG:
```
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
def push_to_xcom(**kwargs):
value = "Hello, XCom!"
return value
def pull_from_xcom(**kwargs):
ti = kwargs['ti']
xcom_value = ti.xcom_pull(task_ids='invalid_id')
print(f"Retrieved XCom Value: {xcom_value}")
with DAG(
'xcom_example',
schedule=None,
catchup=False,
) as dag:
push_xcom_task = PythonOperator(
task_id='push_xcom_task',
python_callable=push_to_xcom,
)
pull_xcom_task = PythonOperator(
task_id='pull_xcom_task',
python_callable=pull_from_xcom,
)
push_xcom_task >> pull_xcom_task
```
Here the `invalid_id` task id doesn't exist. So the XCOM pull should fail
gracefully.
Instead, the executor just crashed:
```
[2024-12-31T06:53:15.741+0000] {_client.py:1026} INFO - HTTP Request: GET
http://localhost:9091/execution/xcoms/xcom_example/manual__2024-12-31T06:53:14.523233+00:00/invalid_id/return_value?map_index=-1
"HTTP/1.1 404 Not Found"
2024-12-31 06:53:15 [warning ] Server error
[airflow.sdk.api.client] detail={'detail': {'reason': 'not_found', 'message':
"XCom with key 'return_value' not found for task 'invalid_id' in DAG
'xcom_example'"}}
[2024-12-31T06:53:15.742+0000] {local_executor.py:96} ERROR - uhoh
Traceback (most recent call last):
File "/opt/airflow/airflow/executors/local_executor.py", line 92, in
_run_worker
_execute_work(log, workload)
File "/opt/airflow/airflow/executors/local_executor.py", line 113, in
_execute_work
supervise(
File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py",
line 898, in supervise
exit_code = process.wait()
File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py",
line 512, in wait
self._monitor_subprocess()
File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py",
line 554, in _monitor_subprocess
alive = self._service_subprocess(max_wait_time=max_wait_time) is None
File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py",
line 598, in _service_subprocess
need_more = socket_handler(key.fileobj)
File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py",
line 785, in cb
gen.send(line)
File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py",
line 708, in handle_requests
self._handle_request(msg, log)
File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py",
line 728, in _handle_request
xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id,
msg.key, msg.map_index)
File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 222, in
get
resp = self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}",
params=params)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1054,
in get
return self.request(
File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line
336, in wrapped_f
return copy(f, *args, **kw)
File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line
475, in __call__
do = self.iter(retry_state=retry_state)
File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line
376, in iter
result = action(retry_state)
File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line
398, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in
result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in
__get_result
raise self._exception
File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line
478, in __call__
result = fn(*args, **kwargs)
File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 317, in
request
return super().request(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 827,
in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 914,
in send
response = self._send_handling_auth(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 942,
in _send_handling_auth
response = self._send_handling_redirects(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 999,
in _send_handling_redirects
raise exc
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 982,
in _send_handling_redirects
hook(response)
File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 93, in
raise_on_4xx_5xx
return get_json_error(response) or response.raise_for_status()
File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 89, in
get_json_error
raise err
airflow.sdk.api.client.ServerResponseError: Server returned error
```
Legacy Airflow just ignores such cases and moves on with a return.

### Committer
- [X] I acknowledge that I am a maintainer/committer of the Apache Airflow
project.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]