wolfier opened a new issue, #51969: URL: https://github.com/apache/airflow/issues/51969
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.10.5 ### What happened? When a worker terminates abruptly, task instances (corresponding LocalTaskJobs) on that worker stops producing heartbeats. Those TIs are then found and purged as task instances without heartbeats (zombie). The scheduler finds these TIs every 10 seconds by default (configured by [zombie_detection_interval](https://airflow.apache.org/docs/apache-airflow/2.10.5/configurations-ref.html#zombie-detection-interval)). The issue manifests when the scheduler finds the same TI at future intervals, usually the next interval, before the initial purge completes. To be more specific, before the task is marked as any state other than `running` which is one of the conditions of being found as a TI without heartbeats. This behaviour was described in https://github.com/apache/airflow/issues/32289 as well. This happened to me when a TI with on_failure_callback and retries was found as TI without heartbeats and marked as `up_for_retry`. However, before the task was marked as `up_for_retry`, it was found as a TI without heartbeats again. Now there are **TWO** callbacks requested. The TI then ran the next attempt but was marked as `failed` mid-execution. This is a simplified version of the timeline for the above scenario. 1. task starts 1st attempt but command failed [TI state: `running`] 2. task found as a TI without heartbeat and scheduler send out a callback request [time x] [TI state: `running`] 3. task found as a zombie and scheduler send out a callback request [time x+10] [TI state: `running`] 4. task callback request processed [time x+11] [TI state: `up_for_retry`] 5. task starts 2nd attempt [TI state: `running`] 6. task callback request processed and downstream marked as `upstream_failed` [TI state: `failed`] 7. task finishes (and due to task logic) marked as skipped [TI state: `skipped`] ### What you think should happen instead? The scheduler [sends the callback request](https://github.com/apache/airflow/blob/2.10.5/airflow/jobs/scheduler_job_runner.py#L2116) to the DagFileProcessorManager through the executor. In Airflow 2.10.5, [all callback requests sent this way is queued](https://github.com/apache/airflow/blob/2.10.5/airflow/dag_processing/manager.py#L779). I think the issue is that the scheduler does not know whether a callback request for the same task attempt has been sent before and whether the request has completed. Without the information, the scheduler potentially send out more callbacks than necessary. ### How to reproduce I was able to reproduce the issue with the instructions in https://github.com/apache/airflow/issues/32289. I tried a similar DAG and the issue is reproducible. Here is the Airflow configuration to help reproduce the issue. ``` ENV AIRFLOW__CORE__DAG_DIR_LIST_INTERVAL=5 ENV AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=120 ENV AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=150 ENV AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=10 ENV AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=10 ``` <details> <summary>Here is the DAG</summary> ```python from airflow import DAG from airflow.operators.python import PythonOperator import time import logging from datetime import datetime, timedelta time.sleep(15) def sleep(**context): time.sleep(600) def alert(context): logging.info(f"{context['task_instance'].task_id} failed") with DAG( "my_dag", schedule_interval="@daily", start_date=datetime(2025, 1, 1), max_active_runs=1, catchup=False ) as dag: task = PythonOperator( task_id="test", python_callable=sleep, retries=0, # retry quickly to ensure the task will be successful when the second clean up occurs retry_delay=timedelta(seconds=1), on_failure_callback=alert ) ``` </details> First, let the task run. The task is assigned to the worker `critical-eccentricity-9882-worker-default-55bdb445b8-7bb92`. ``` [2025-06-20T00:40:40.407+0000] {scheduler_job_runner.py:803} INFO - Setting external_id for <TaskInstance: my_dag.test scheduled__2025-06-19T00:00:00+00:00 [queued]> to ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08 ... [2025-06-20 00:40:40,407: INFO/ForkPoolWorker-6] [ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08] Executing command in Celery: ['airflow', 'tasks', 'run', 'my_dag', 'test', 'scheduled__2025-06-19T00:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/zombie.py'] ``` Then kill the worker while the TI is executing. In this case, I abruptly deleted the worker at `2025-06-20T00:41:05`. ``` alan@Alans-MacBook-Pro test % kubectl delete pod critical-eccentricity-9882-worker-default-55bdb445b8-7bb92 --grace-period=0 pod "critical-eccentricity-9882-worker-default-55bdb445b8-7bb92" deleted ``` The scheduler detected the lack of heartbeat from the TI (LocalTaskJob). This is the **FIRST** time. ``` [2025-06-20T00:41:18.162+0000] {scheduler_job_runner.py:2086} WARNING - Failing (1) jobs without heartbeat after 2025-06-20 00:41:08.156992+00:00 [2025-06-20T00:41:18.162+0000] {scheduler_job_runner.py:2110} ERROR - Detected zombie job: {'full_filepath': '/usr/local/airflow/dags/zombie.py', 'processor_subdir': '/usr/local/airflow/dags', 'msg': "{'DAG Id': 'my_dag', 'Task Id': 'test', 'Run Id': 'scheduled__2025-06-19T00:00:00+00:00', 'Hostname': '172.20.12.44', 'External Executor Id': 'ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08'}", 'simple_task_instance': SimpleTaskInstance(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', map_index=-1, start_date=datetime.datetime(2025, 6, 20, 0, 40, 55, 552863, tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', executor=None, executor_config={}, run_as_user=None, pool='default_pool', priority_weight=1, queue='default', key=TaskInstanceKey(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', try_number=1, map_index=-1)), 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/ tasks.html#zombie-undead-tasks) ``` The scheduler found the TI as a TI without heartbeat (zombie) again. This is the **SECOND** time. ``` [2025-06-20T00:41:28.230+0000] {scheduler_job_runner.py:2086} WARNING - Failing (1) jobs without heartbeat after 2025-06-20 00:41:18.224929+00:00 [2025-06-20T00:41:28.230+0000] {scheduler_job_runner.py:2110} ERROR - Detected zombie job: {'full_filepath': '/usr/local/airflow/dags/zombie.py', 'processor_subdir': '/usr/local/airflow/dags', 'msg': "{'DAG Id': 'my_dag', 'Task Id': 'test', 'Run Id': 'scheduled__2025-06-19T00:00:00+00:00', 'Hostname': '172.20.12.44', 'External Executor Id': 'ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08'}", 'simple_task_instance': SimpleTaskInstance(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', map_index=-1, start_date=datetime.datetime(2025, 6, 20, 0, 40, 55, 552863, tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', executor=None, executor_config={}, run_as_user=None, pool='default_pool', priority_weight=1, queue='default', key=TaskInstanceKey(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', try_number=1, map_index=-1)), 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/ tasks.html#zombie-undead-tasks) [2025-06-20T00:41:28.898+0000] {scheduler_job_runner.py:813} INFO - TaskInstance Finished: dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, map_index=-1, run_start_date=2025-06-20 00:40:55.552863+00:00, run_end_date=None, run_duration=None, state=running, executor=CeleryExecutor(parallelism=100), executor_state=failed, try_number=1, max_tries=0, job_id=185, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2025-06-20 00:40:40.273765+00:00, queued_by_job_id=183, pid=59 ``` A DagFileProcessing process began processing the source file `/usr/local/airflow/dags/zombie.py `. ``` [2025-06-20T00:41:28.253+0000] [SOURCE:DAG_PROCESSOR] {processor.py:914} INFO - Processing file /usr/local/airflow/dags/zombie.py for tasks to queue [2025-06-20T00:41:28.255+0000] [SOURCE:DAG_PROCESSOR] {dagbag.py:588} INFO - Filling up the DagBag from /usr/local/airflow/dags/zombie.py ``` The scheduler found the TI as a TI without heartbeat (zombie) again. This is the **THIRD** time. ``` [2025-06-20T00:41:38.250+0000] {scheduler_job_runner.py:2086} WARNING - Failing (1) jobs without heartbeat after 2025-06-20 00:41:28.245918+00:00 [2025-06-20T00:41:38.251+0000] {scheduler_job_runner.py:2110} ERROR - Detected zombie job: {'full_filepath': '/usr/local/airflow/dags/zombie.py', 'processor_subdir': '/usr/local/airflow/dags', 'msg': "{'DAG Id': 'my_dag', 'Task Id': 'test', 'Run Id': 'scheduled__2025-06-19T00:00:00+00:00', 'Hostname': '172.20.12.44', 'External Executor Id': 'ca9332a5-26b3-4d3f-a2c8-3ac6196f8f08'}", 'simple_task_instance': SimpleTaskInstance(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', map_index=-1, start_date=datetime.datetime(2025, 6, 20, 0, 40, 55, 552863, tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', executor=None, executor_config={}, run_as_user=None, pool='default_pool', priority_weight=1, queue='default', key=TaskInstanceKey(dag_id='my_dag', task_id='test', run_id='scheduled__2025-06-19T00:00:00+00:00', try_number=1, map_index=-1)), 'task_callback_type': None} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/ tasks.html#zombie-undead-tasks) [2025-06-20T00:41:39.052+0000] {scheduler_job_runner.py:813} INFO - TaskInstance Finished: dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, map_index=-1, run_start_date=2025-06-20 00:40:55.552863+00:00, run_end_date=None, run_duration=None, state=running, executor=CeleryExecutor(parallelism=100), executor_state=failed, try_number=1, max_tries=0, job_id=185, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2025-06-20 00:40:40.273765+00:00, queued_by_job_id=183, pid=59 ``` Processing finished and the process executed the callback request from the first and second zombie purging attempts. ``` [2025-06-20T00:41:43.261+0000] [SOURCE:DAG_PROCESSOR] {processor.py:925} INFO - DAG(s) 'my_dag' retrieved from /usr/local/airflow/dags/zombie.py [2025-06-20T00:41:43.406+0000] {taskinstance.py:1226} INFO - Marking task as FAILED. dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, execution_date=20250619T000000, start_date=20250620T004055, end_date=20250620T004143 [2025-06-20T00:41:43.406+0000] {taskinstance.py:1564} INFO - Executing callback at index 0: alert [2025-06-20T00:41:43.517+0000] {taskinstance.py:1226} INFO - Marking task as FAILED. dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, execution_date=20250619T000000, start_date=20250620T004055, end_date=20250620T004143 [2025-06-20T00:41:43.517+0000] {taskinstance.py:1564} INFO - Executing callback at index 0: alert [2025-06-20T00:41:43.599+0000] [SOURCE:DAG_PROCESSOR] {processor.py:208} INFO - Processing /usr/local/airflow/dags/zombie.py took 15.351 seconds ``` A DagFileProcessing process began processing the source file `/usr/local/airflow/dags/zombie.py `. ``` [2025-06-20T00:41:53.832+0000] [SOURCE:DAG_PROCESSOR] {processor.py:186} INFO - Started process (PID=149) to work on /usr/local/airflow/dags/zombie.py [2025-06-20T00:41:53.833+0000] [SOURCE:DAG_PROCESSOR] {processor.py:914} INFO - Processing file /usr/local/airflow/dags/zombie.py for tasks to queue ``` Processing finishes and executes the callback request from the third zombie purging attempts. ``` [2025-06-20T00:42:08.840+0000] [SOURCE:DAG_PROCESSOR] {processor.py:925} INFO - DAG(s) 'my_dag' retrieved from /usr/local/airflow/dags/zombie.py [2025-06-20T00:42:08.956+0000] {taskinstance.py:1226} INFO - Marking task as FAILED. dag_id=my_dag, task_id=test, run_id=scheduled__2025-06-19T00:00:00+00:00, execution_date=20250619T000000, start_date=20250620T004055, end_date=20250620T004208 [2025-06-20T00:42:08.956+0000] {taskinstance.py:1564} INFO - Executing callback at index 0: alert ``` There should NOT be three instances of `on_failure_callbacks` executions for one attempt of the same task instance. ### Operating System Linux ### Versions of Apache Airflow Providers _No response_ ### Deployment Astronomer ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
