wolfier opened a new issue, #29455: URL: https://github.com/apache/airflow/issues/29455
### Apache Airflow version 2.5.1 ### What happened When the scheduler [finds zombies](https://github.com/apache/airflow/blob/2.5.1/airflow/jobs/scheduler_job.py#L1541-L1542), a log emitted to indicate how many jobs was found without heartbeats. ``` [2023-01-12T03:26:33.347+0000] {scheduler_job.py:1533} WARNING - Failing (1) jobs without heartbeat after 2023-01-12 03:21:33.317638+00:00 ``` An odd case where a task instance became a zombie right after being executed. --- The task is scheduled and queued by the scheduler and passed to the executor. ``` [2023-01-12T03:26:14.983+0000] {scheduler_job.py:360} INFO - 1 tasks up for execution: <TaskInstance: model_contracts.wait_for_last_product_astro scheduled__2023-01-12T02:00:00+00:00 [scheduled]> [2023-01-12T03:26:14.984+0000] {scheduler_job.py:511} INFO - Setting the following tasks to queued state: <TaskInstance: model_contracts.wait_for_last_product_astro scheduled__2023-01-12T02:00:00+00:00 [scheduled]> [2023-01-12T03:26:14.991+0000] {base_executor.py:95} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'model_contracts', 'wait_for_last_product_astro', 'scheduled__2023-01-12T02:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/declarative/gusty/hourly/hourly.py'] [2023-01-12T03:26:14.991+0000] {scheduler_job.py:550} INFO - Sending TaskInstanceKey(dag_id='model_contracts', task_id='wait_for_last_product_astro', run_id='scheduled__2023-01-12T02:00:00+00:00', try_number=1, map_index=-1) to executor with priority 15500 and queue default ``` Celery worker picks up task instance, assigns celery task id (uuid), and emits executor event into event_buffer. ``` [2023-01-12 03:26:15,005: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[f4242c9e-9426-4b2d-b55c-323731d74e09] received [2023-01-12 03:26:15,022: INFO/ForkPoolWorker-2] [f4242c9e-9426-4b2d-b55c-323731d74e09] Executing command in Celery: ['airflow', 'tasks', 'run', 'model_contracts', 'wait_for_last_product_astro', 'scheduled__2023-01-12T02:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/declarative/gusty/hourly/hourly.py'] ``` Scheduler reads event_buffer and acknowledges the task instances as assigned in Celery. ``` [2023-01-12T03:26:15.145+0000] {scheduler_job.py:635} INFO - Setting external_id for <TaskInstance: model_contracts.wait_for_last_product_astro scheduled__2023-01-12T02:00:00+00:00 [queued]> to f4242c9e-9426-4b2d-b55c-323731d74e09 ``` The task instance is marked as zombie soon after. ``` [2023-01-12T03:26:33.347+0000] {scheduler_job.py:1533} WARNING - Failing (1) jobs without heartbeat after 2023-01-12 03:21:33.317638+00:00 [2023-01-12T03:26:33.355+0000] {scheduler_job.py:1543} ERROR - Detected zombie job: {'full_filepath': '/usr/local/airflow/dags/declarative/gusty/hourly/hourly.py', 'processor_subdir': '/usr/local/airflow/dags', 'msg': "{'DAG Id': 'model_contracts', 'Task Id': 'wait_for_last_product_astro', 'Run Id': 'scheduled__2023-01-12T02:00:00+00:00', 'Hostname': '10.2.124.99', 'External Executor Id': 'f4242c9e-9426-4b2d-b55c-323731d74e09'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f4eb846d760>, 'is_failure_callback': True} [2023-01-12T03:26:36.044+0000] {taskinstance.py:1774} ERROR - {'DAG Id': 'model_contracts', 'Task Id': 'wait_for_last_product_astro', 'Run Id': 'scheduled__2023-01-12T02:00:00+00:00', 'Hostname': '10.2.124.99', 'External Executor Id': 'f4242c9e-9426-4b2d-b55c-323731d74e09'} ``` Based on the task logs, the task run command never got to the task execution part. ``` [2023-01-12, 03:26:17 UTC] {standard_task_runner.py:83} INFO - Job 4184045: Subtask wait_for_last_product_astro [2023-01-12, 03:26:22 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 4184045 for task wait_for_last_product_astro ((psycopg2.OperationalError) could not translate host name "geocentric-spacecraft-1886-pgbouncer.geocentric-spacecraft-1886.svc.cluster.local" to address: Temporary failure in name resolution (Background on this error at: https://sqlalche.me/e/14/e3q8); 7131) [2023-01-12, 03:26:23 UTC] {local_task_job.py:159} INFO - Task exited with return code 1 ``` Given the [command execution encounter an exception](https://github.com/apache/airflow/blob/2.5.0/airflow/cli/commands/task_command.py#L383-L389) before running the execute method, the StandardTaskRunner exited followed by the LocalTaskJob also exiting with the state success without handling the state of the task instance. At this point the state of the [task instance is running](https://github.com/apache/airflow/blob/2.5.0/airflow/jobs/local_task_job.py#L88-L97) because the LocalTaskJob successfully created the StandardTaskRunner. A task instance in the running state with its corresponding LocalTaskJob in the success state means the task instance is now a zombie but not because of the lack of heartbeats. ### What you think should happen instead As explained above not all zombies are caused by missed heartbeat. When a `LocalTaskJob` succeeds or fails while the task instance is still in the running state, the task instance can also become a zombie. While It is true that the LocalTaskJob corresponding to the task instance does not have a heartbeat anymore. I think it is incorrect to say the LocalTaskJob does not have heartbeats after `scheduler_zombie_task_threshold` because that implies the LocalTaskJob was producing heartbeats before current time minus `scheduler_zombie_task_threshold` seconds. It would be more accurate to say something like this. ``` Failing (1) jobs without a running LocalTaskJob. ``` The current wording makes more sense for the common case where the LocalTaskJob is unable to update the heartbeat while still in the running state and the task instance is also in the running state. ``` Failing (1) jobs without heartbeat after <current time minus `scheduler_zombie_task_threshold` seconds> ``` I would like either: * a separate logging statement to prevent confusion, or less preferred * a more generic statement indicating the failing of a job (not specifying without heartbeat) ### How to reproduce This is hard to do since you will need to fail the airflow run command before [_run_task_by_selected_method](https://github.com/apache/airflow/blob/2.5.0/airflow/cli/commands/task_command.py#L396) runs. ### Operating System n/a ### Versions of Apache Airflow Providers _No response_ ### Deployment Astronomer ### Deployment details n/a ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
