amoghrajesh opened a new issue, #45341:
URL: https://github.com/apache/airflow/issues/45341

   ### Body
   
   Example DAG:
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   
   def push_to_xcom(**kwargs):
       value = "Hello, XCom!"
       return value
   
   
   def pull_from_xcom(**kwargs):
       ti = kwargs['ti']
       xcom_value = ti.xcom_pull(task_ids='invalid_id')
       print(f"Retrieved XCom Value: {xcom_value}")
   
   
   with DAG(
       'xcom_example',
       schedule=None,
       catchup=False,
   ) as dag:
   
       push_xcom_task = PythonOperator(
           task_id='push_xcom_task',
           python_callable=push_to_xcom,
       )
   
       pull_xcom_task = PythonOperator(
           task_id='pull_xcom_task',
           python_callable=pull_from_xcom,
       )
   
       push_xcom_task >> pull_xcom_task
   ```
   
   Here the `invalid_id` task id doesn't exist. So the XCOM pull should fail 
gracefully.
   Instead, the executor just crashed:
   ```
   [2024-12-31T06:53:15.741+0000] {_client.py:1026} INFO - HTTP Request: GET 
http://localhost:9091/execution/xcoms/xcom_example/manual__2024-12-31T06:53:14.523233+00:00/invalid_id/return_value?map_index=-1
 "HTTP/1.1 404 Not Found"
   2024-12-31 06:53:15 [warning  ] Server error                   
[airflow.sdk.api.client] detail={'detail': {'reason': 'not_found', 'message': 
"XCom with key 'return_value' not found for task 'invalid_id' in DAG 
'xcom_example'"}}
   [2024-12-31T06:53:15.742+0000] {local_executor.py:96} ERROR - uhoh
   Traceback (most recent call last):
     File "/opt/airflow/airflow/executors/local_executor.py", line 92, in 
_run_worker
       _execute_work(log, workload)
     File "/opt/airflow/airflow/executors/local_executor.py", line 113, in 
_execute_work
       supervise(
     File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 898, in supervise
       exit_code = process.wait()
     File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 512, in wait
       self._monitor_subprocess()
     File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 554, in _monitor_subprocess
       alive = self._service_subprocess(max_wait_time=max_wait_time) is None
     File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 598, in _service_subprocess
       need_more = socket_handler(key.fileobj)
     File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 785, in cb
       gen.send(line)
     File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 708, in handle_requests
       self._handle_request(msg, log)
     File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 728, in _handle_request
       xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, 
msg.key, msg.map_index)
     File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 222, in 
get
       resp = self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}", 
params=params)
     File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1054, 
in get
       return self.request(
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 
336, in wrapped_f
       return copy(f, *args, **kw)
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 
475, in __call__
       do = self.iter(retry_state=retry_state)
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 
376, in iter
       result = action(retry_state)
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 
398, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
     File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in 
__get_result
       raise self._exception
     File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 
478, in __call__
       result = fn(*args, **kwargs)
     File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 317, in 
request
       return super().request(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 827, 
in request
       return self.send(request, auth=auth, follow_redirects=follow_redirects)
     File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 914, 
in send
       response = self._send_handling_auth(
     File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 942, 
in _send_handling_auth
       response = self._send_handling_redirects(
     File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 999, 
in _send_handling_redirects
       raise exc
     File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 982, 
in _send_handling_redirects
       hook(response)
     File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 93, in 
raise_on_4xx_5xx
       return get_json_error(response) or response.raise_for_status()
     File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 89, in 
get_json_error
       raise err
   airflow.sdk.api.client.ServerResponseError: Server returned error
   ```
   
   Legacy Airflow just ignores such cases and moves on with a return.
   ![image 
(16)](https://github.com/user-attachments/assets/879de435-a27b-4bd3-82e3-2f315bc15166)
   
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to