DickeyLen opened a new issue, #52621: URL: https://github.com/apache/airflow/issues/52621
### Apache Airflow version 3.0.2 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? When using the Kubernetes Executor with tasks that have `on_failure_callback` defined, if a Kubernetes pod crashes or fails, the task enters an infinite loop instead of being properly marked as failed. The scheduler attempts to execute the failure callback by generating a `TaskCallbackRequest`, but since the handling of `TaskCallbackRequest` in `dag_processing.processor.py` is not implemented (it raises a NotImplementedError), the task state never transitions to FAILED. The logs show the following errors repeating: <img width="1317" alt="Image" src="https://github.com/user-attachments/assets/6f9a2b23-4191-4798-a8aa-73c680cc21d9" /> ### What you think should happen instead? In previous versions, `on_failure_callback` would still change the task state, but now it's temporarily unimplemented: ``` if isinstance(request, TaskCallbackRequest): raise NotImplementedError( "Haven't coded Task callback yet - https://github.com/apache/airflow/issues/44354!" ) # _execute_task_callbacks(dagbag, request) ``` This results in the task state never being changed, requiring manual intervention. The proper implementation should handle the TaskCallbackRequest, execute the callback if defined, and most importantly, ensure the task state transitions to FAILED regardless of the callback's execution status. ### How to reproduce ``` from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow import DAG from kubernetes.client import models as k8s def failure_callback(context): print(f'Task failed: {context['task_instance']}') with DAG('test_failure_callback', schedule='@daily') as dag: task = KubernetesPodOperator( task_id='failing_task', name='failing_task', namespace='airflow', image='python:3.11-slim', cmds=['bash', '-c'], arguments=['python -c "import time; time.sleep(300);"'], labels={'example': 'test'}, container_resources=k8s.V1ResourceRequirements( requests={'cpu': '500m', 'memory': '500Mi'}, limits={'cpu': '500m', 'memory': '500Mi'}, ), on_failure_callback=failure_callback, ) ``` After the Kubernetes executor starts, enter the pod and run Python code that causes an OOM to simulate a pod crash. ### Operating System Linux ### Versions of Apache Airflow Providers ``` apache-airflow-providers-amazon==9.8.0 apache-airflow-providers-celery==3.11.0 apache-airflow-providers-cncf-kubernetes==10.5.0 apache-airflow-providers-common-compat==1.7.0 apache-airflow-providers-common-io==1.6.0 apache-airflow-providers-common-messaging==1.0.3 apache-airflow-providers-common-sql==1.27.1 apache-airflow-providers-docker==4.4.0 apache-airflow-providers-elasticsearch==6.3.0 apache-airflow-providers-fab==2.2.1 apache-airflow-providers-ftp==3.13.0 apache-airflow-providers-git==0.0.2 apache-airflow-providers-google==15.1.0 apache-airflow-providers-grpc==3.8.0 apache-airflow-providers-hashicorp==4.2.0 apache-airflow-providers-http==5.3.0 apache-airflow-providers-microsoft-azure==12.4.0 apache-airflow-providers-mysql==6.3.0 apache-airflow-providers-odbc==4.10.0 apache-airflow-providers-openlineage==2.3.0 apache-airflow-providers-postgres==6.2.0 apache-airflow-providers-redis==4.1.0 apache-airflow-providers-sendgrid==4.1.0 apache-airflow-providers-sftp==5.3.0 apache-airflow-providers-slack==9.1.0 apache-airflow-providers-smtp==2.1.0 apache-airflow-providers-snowflake==6.3.1 apache-airflow-providers-ssh==4.1.0 apache-airflow-providers-standard==1.2.0 ``` ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
