DickeyLen opened a new issue, #52621:
URL: https://github.com/apache/airflow/issues/52621

   ### Apache Airflow version
   
   3.0.2
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When using the Kubernetes Executor with tasks that have 
`on_failure_callback` defined, if a Kubernetes pod crashes or fails, the task 
enters an infinite loop instead of being properly marked as failed. The 
scheduler attempts to execute the failure callback by generating a 
`TaskCallbackRequest`, but since the handling of `TaskCallbackRequest` in 
`dag_processing.processor.py` is not implemented (it raises a 
NotImplementedError), the task state never transitions to FAILED.
   
   The logs show the following errors repeating:
   
   <img width="1317" alt="Image" 
src="https://github.com/user-attachments/assets/6f9a2b23-4191-4798-a8aa-73c680cc21d9";
 />
   
   ### What you think should happen instead?
   
   In previous versions, `on_failure_callback` would still change the task 
state, but now it's temporarily unimplemented:
   ```
   if isinstance(request, TaskCallbackRequest):
       raise NotImplementedError(
           "Haven't coded Task callback yet - 
https://github.com/apache/airflow/issues/44354!";
       )
       # _execute_task_callbacks(dagbag, request)
   ```
   This results in the task state never being changed, requiring manual 
intervention. The proper implementation should handle the TaskCallbackRequest, 
execute the callback if defined, and most importantly, ensure the task state 
transitions to FAILED regardless of the callback's execution status.
   
   ### How to reproduce
   
   ```
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   
   from airflow import DAG
   from kubernetes.client import models as k8s
   
   def failure_callback(context):
       print(f'Task failed: {context['task_instance']}')
   
   
   with DAG('test_failure_callback', schedule='@daily') as dag:
       task = KubernetesPodOperator(
           task_id='failing_task',
           name='failing_task',
           namespace='airflow',
           image='python:3.11-slim',
           cmds=['bash', '-c'],
           arguments=['python -c "import time; time.sleep(300);"'],
           labels={'example': 'test'},
           container_resources=k8s.V1ResourceRequirements(
               requests={'cpu': '500m', 'memory': '500Mi'},
               limits={'cpu': '500m', 'memory': '500Mi'},
           ),
           on_failure_callback=failure_callback,
       )
   ```
   After the Kubernetes executor starts, enter the pod and run Python code that 
causes an OOM to simulate a pod crash.
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==9.8.0
   apache-airflow-providers-celery==3.11.0
   apache-airflow-providers-cncf-kubernetes==10.5.0
   apache-airflow-providers-common-compat==1.7.0
   apache-airflow-providers-common-io==1.6.0
   apache-airflow-providers-common-messaging==1.0.3
   apache-airflow-providers-common-sql==1.27.1
   apache-airflow-providers-docker==4.4.0
   apache-airflow-providers-elasticsearch==6.3.0
   apache-airflow-providers-fab==2.2.1
   apache-airflow-providers-ftp==3.13.0
   apache-airflow-providers-git==0.0.2
   apache-airflow-providers-google==15.1.0
   apache-airflow-providers-grpc==3.8.0
   apache-airflow-providers-hashicorp==4.2.0
   apache-airflow-providers-http==5.3.0
   apache-airflow-providers-microsoft-azure==12.4.0
   apache-airflow-providers-mysql==6.3.0
   apache-airflow-providers-odbc==4.10.0
   apache-airflow-providers-openlineage==2.3.0
   apache-airflow-providers-postgres==6.2.0
   apache-airflow-providers-redis==4.1.0
   apache-airflow-providers-sendgrid==4.1.0
   apache-airflow-providers-sftp==5.3.0
   apache-airflow-providers-slack==9.1.0
   apache-airflow-providers-smtp==2.1.0
   apache-airflow-providers-snowflake==6.3.1
   apache-airflow-providers-ssh==4.1.0
   apache-airflow-providers-standard==1.2.0
   ```
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to