wolfier opened a new issue, #29455:
URL: https://github.com/apache/airflow/issues/29455

   ### Apache Airflow version
   
   2.5.1
   
   ### What happened
   
   When the scheduler [finds 
zombies](https://github.com/apache/airflow/blob/2.5.1/airflow/jobs/scheduler_job.py#L1541-L1542),
 a log emitted to indicate how many jobs was found without heartbeats.
   
   ```
   [2023-01-12T03:26:33.347+0000] {scheduler_job.py:1533} WARNING - Failing (1) 
jobs without heartbeat after 2023-01-12 03:21:33.317638+00:00
   ```
   
   An odd case where a task instance became a zombie right after being executed.
   
   ---
   
   The task is scheduled and queued by the scheduler and passed to the executor.
   
   ```
   [2023-01-12T03:26:14.983+0000] {scheduler_job.py:360} INFO - 1 tasks up for 
execution:
       <TaskInstance: model_contracts.wait_for_last_product_astro 
scheduled__2023-01-12T02:00:00+00:00 [scheduled]>
   [2023-01-12T03:26:14.984+0000] {scheduler_job.py:511} INFO - Setting the 
following tasks to queued state:
       <TaskInstance: model_contracts.wait_for_last_product_astro 
scheduled__2023-01-12T02:00:00+00:00 [scheduled]>
   [2023-01-12T03:26:14.991+0000] {base_executor.py:95} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'model_contracts', 'wait_for_last_product_astro', 
'scheduled__2023-01-12T02:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/declarative/gusty/hourly/hourly.py']
   [2023-01-12T03:26:14.991+0000] {scheduler_job.py:550} INFO - Sending 
TaskInstanceKey(dag_id='model_contracts', 
task_id='wait_for_last_product_astro', 
run_id='scheduled__2023-01-12T02:00:00+00:00', try_number=1, map_index=-1) to 
executor with priority 15500 and queue default
   ```
   
   Celery worker picks up task instance, assigns celery task id (uuid), and 
emits executor event into event_buffer.
   
   ```
   [2023-01-12 03:26:15,005: INFO/MainProcess] Task 
airflow.executors.celery_executor.execute_command[f4242c9e-9426-4b2d-b55c-323731d74e09]
 received
   [2023-01-12 03:26:15,022: INFO/ForkPoolWorker-2] 
[f4242c9e-9426-4b2d-b55c-323731d74e09] Executing command in Celery: ['airflow', 
'tasks', 'run', 'model_contracts', 'wait_for_last_product_astro', 
'scheduled__2023-01-12T02:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/declarative/gusty/hourly/hourly.py']
   ```
   
   Scheduler reads event_buffer and acknowledges the task instances as assigned 
in Celery.
   
   ```
   [2023-01-12T03:26:15.145+0000] {scheduler_job.py:635} INFO - Setting 
external_id for <TaskInstance: model_contracts.wait_for_last_product_astro 
scheduled__2023-01-12T02:00:00+00:00 [queued]> to 
f4242c9e-9426-4b2d-b55c-323731d74e09
   ```
   
   The task instance is marked as zombie soon after.
   
   ```
   [2023-01-12T03:26:33.347+0000] {scheduler_job.py:1533} WARNING - Failing (1) 
jobs without heartbeat after 2023-01-12 03:21:33.317638+00:00
   [2023-01-12T03:26:33.355+0000] {scheduler_job.py:1543} ERROR - Detected 
zombie job: {'full_filepath': 
'/usr/local/airflow/dags/declarative/gusty/hourly/hourly.py', 
'processor_subdir': '/usr/local/airflow/dags', 'msg': "{'DAG Id': 
'model_contracts', 'Task Id': 'wait_for_last_product_astro', 'Run Id': 
'scheduled__2023-01-12T02:00:00+00:00', 'Hostname': '10.2.124.99', 'External 
Executor Id': 'f4242c9e-9426-4b2d-b55c-323731d74e09'}", 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0x7f4eb846d760>, 
'is_failure_callback': True}
   [2023-01-12T03:26:36.044+0000] {taskinstance.py:1774} ERROR - {'DAG Id': 
'model_contracts', 'Task Id': 'wait_for_last_product_astro', 'Run Id': 
'scheduled__2023-01-12T02:00:00+00:00', 'Hostname': '10.2.124.99', 'External 
Executor Id': 'f4242c9e-9426-4b2d-b55c-323731d74e09'}
   ```
   
   Based on the task logs, the task run command never got to the task execution 
part.
   
   ```
   [2023-01-12, 03:26:17 UTC] {standard_task_runner.py:83} INFO - Job 4184045: 
Subtask wait_for_last_product_astro
   [2023-01-12, 03:26:22 UTC] {standard_task_runner.py:100} ERROR - Failed to 
execute job 4184045 for task wait_for_last_product_astro 
((psycopg2.OperationalError) could not translate host name 
"geocentric-spacecraft-1886-pgbouncer.geocentric-spacecraft-1886.svc.cluster.local"
 to address: Temporary failure in name resolution
   
   (Background on this error at: https://sqlalche.me/e/14/e3q8); 7131)
   [2023-01-12, 03:26:23 UTC] {local_task_job.py:159} INFO - Task exited with 
return code 1
   ```
   
   Given the [command execution encounter an 
exception](https://github.com/apache/airflow/blob/2.5.0/airflow/cli/commands/task_command.py#L383-L389)
 before running the execute method, the StandardTaskRunner exited followed by 
the LocalTaskJob also exiting with the state success without handling the state 
of the task instance. At this point the state of the [task instance is 
running](https://github.com/apache/airflow/blob/2.5.0/airflow/jobs/local_task_job.py#L88-L97)
 because the LocalTaskJob successfully created the StandardTaskRunner.
   
   A task instance in the running state with its corresponding LocalTaskJob in 
the success state means the task instance is now a zombie but not because of 
the lack of heartbeats.
   
   ### What you think should happen instead
   
   As explained above not all zombies are caused by missed heartbeat. When a 
`LocalTaskJob` succeeds or fails while the task instance is still in the 
running state, the task instance can also become a zombie.
   
   While It is true that the LocalTaskJob corresponding to the task instance 
does not have a heartbeat anymore. I think it is incorrect to say the 
LocalTaskJob does not have heartbeats after `scheduler_zombie_task_threshold` 
because that implies  the LocalTaskJob was producing heartbeats before current 
time minus `scheduler_zombie_task_threshold` seconds. 
   
   It would be more accurate to say something like this.
   
   ```
   Failing (1) jobs without a running LocalTaskJob.
   ```
   
   The current wording makes more sense for the common case where the 
LocalTaskJob is unable to update the heartbeat while still in the running state 
and the task instance is also in the running state. 
   
   
   ```
   Failing (1) jobs without heartbeat after <current time minus 
`scheduler_zombie_task_threshold` seconds>
   ```
   
   I would like either:
   
   * a separate logging statement to prevent confusion, or less preferred
   * a more generic statement indicating the failing of a job (not specifying 
without heartbeat)
   
   ### How to reproduce
   
   This is hard to do since you will need to fail the airflow run command 
before 
[_run_task_by_selected_method](https://github.com/apache/airflow/blob/2.5.0/airflow/cli/commands/task_command.py#L396)
 runs.
   
   ### Operating System
   
   n/a
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   n/a
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to