zach-b opened a new issue, #32289:
URL: https://github.com/apache/airflow/issues/32289

   ### Apache Airflow version
   
   2.6.2
   
   ### What happened
   
   We are experiencing the following scenario quite frequently:
   
   1. One of our worker crashes
   2. After some time the task is detected as a zombie by the scheduler, which 
triggers a `TaskCallbackRequest` to be executed by the DAG file processor.
   3. The step above actually happens periodically every 10s until the task has 
been cleaned up.
   4. The DAG File processor starts and clears the `TaskCallbackRequest` queue. 
It starts processing our DAG file.
   5. While it processes our DAG file, the task is still in `running` state and 
gets detected as a zombie again, creating a new callback in the now empty queue.
   6. The DAG File processor finishes parsing our DAG file and executes the 
`TaskCallbackRequest` setting the task state to `up_for_retry`. At this point 
the task is not detected as zombie anymore.
   7. The task is retried successfully, the DAG run finishes, the task and 
DAGRun states are `success`
   8. The DAG File processor runs again, it finds the `TaskCallbackRequest` 
that was added to the queue while the previous one was running, and thus sets 
the task state to `up_for_retry`.
   
   ** At this point we end-up in a weird situation where the DAGRun state is 
`success` while the task is in state `up_for_retry`. **
   
   To make things worse, we use `depends_on_past` for our tasks, leading to a 
dead-lock as the following DAG runs get stuck waiting for the task to be 
retried (which of course never happens since its DAGRun state is `success`).
   
   ### What you think should happen instead
   
   The `TaskCallbackRequest` should not be executed twice for the same zombie 
task.
   
   ### How to reproduce
   
   Run the following DAG with the default airflow 2.6.2 docker-compose, kill 
the worker while the task is running.
   
   ```python
   from datetime import timedelta, datetime
   import time
   
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   
   # simulate longer processing time
   # (higher than zombie detection interval to be sure the bug will happen)
   time.sleep(15)
   
   def sleep(*args, **kwargs):
       if kwargs["ti"].try_number == 1:
           # sleep for a long time on the first try to give time to kill the 
worker
           time.sleep(600)
   
   with DAG(
       "bug-dag", 
       schedule_interval="@hourly", 
       start_date=datetime(2023, 6, 30), 
       max_active_runs=1
   ) as dag:
       task = PythonOperator(
           task_id="test",
           python_callable=sleep,
           retries=2,
           # retry quickly to ensure the task will be successful when the 
second clean up occurs
           retry_delay=timedelta(seconds=1),
           depends_on_past=True,
       )
   ```
   <details>
   <summary>Sample logs:</summary>
   Scheduler:
   task is scheduled and starts on a worker:
   
   ```
   2023-06-30 16:31:06 [2023-06-30T14:31:06.953+0000] 
{scheduler_job_runner.py:625} INFO - Sending TaskInstanceKey(dag_id='bug-dag', 
task_id='test', run_id='scheduled__2023-06-30T00:00:00+00:00', try_number=1, 
map_index=-1) to executor with priority 1 and queue default
   ```
   At 16:32, the worker is manually killed while the task runs
   At 16:37:02, the task is detected as zombie:
   ```
   2023-06-30 16:37:02 [2023-06-30T14:37:02.737+0000] 
{scheduler_job_runner.py:1688} WARNING - Failing (1) jobs without heartbeat 
after 2023-06-30 14:32:02.733681+00:00
   2023-06-30 16:37:02 [2023-06-30T14:37:02.739+0000] 
{scheduler_job_runner.py:1698} ERROR - Detected zombie job: {'full_filepath': 
'/opt/airflow/dags/dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': 
"{'DAG Id': 'bug-dag', 'Task Id': 'test', 'Run Id': 
'scheduled__2023-06-30T00:00:00+00:00', 'Hostname': 'a39eba268731', 'External 
Executor Id': '5c7744d2-60b2-4b9b-92dc-cd1f4cbb3dad'}", 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0xffff9ff6ee10>, 
'is_failure_callback': True}
   ```
   Shortly after, the DAG File processor runs for our `dag.py` file and 
executes the callback, setting the task state to `up_for_retry`:
   ```
   [2023-06-30T14:37:02.763+0000] {processor.py:157} INFO - Started process 
(PID=929) to work on /opt/airflow/dags/dag.py
   [2023-06-30T14:37:02.765+0000] {processor.py:826} INFO - Processing file 
/opt/airflow/dags/dag.py for tasks to queue
   [2023-06-30T14:37:17.814+0000] {logging_mixin.py:149} INFO - 
[2023-06-30T14:37:17.814+0000] {taskinstance.py:1826} ERROR - {'DAG Id': 
'bug-dag', 'Task Id': 'test', 'Run Id': 'scheduled__2023-06-30T00:00:00+00:00', 
'Hostname': 'a39eba268731', 'External Executor Id': 
'5c7744d2-60b2-4b9b-92dc-cd1f4cbb3dad'}
   [2023-06-30T14:37:17.827+0000] {logging_mixin.py:149} INFO - 
[2023-06-30T14:37:17.827+0000] {taskinstance.py:1350} INFO - Marking task as 
UP_FOR_RETRY. dag_id=bug-dag, task_id=test, execution_date=20230630T000000, 
start_date=20230630T143124, end_date=20230630T143717
   [2023-06-30T14:37:17.835+0000] {processor.py:787} INFO - Executed failure 
callback for <TaskInstance: bug-dag.test scheduled__2023-06-30T00:00:00+00:00 
[up_for_retry]> in state up_for_retry
   ```
   While this is running, the task is detected as zombie again and callback is 
scheduled:
   ```
   2023-06-30 16:37:12 [2023-06-30T14:37:12.831+0000] 
{scheduler_job_runner.py:1688} WARNING - Failing (1) jobs without heartbeat 
after 2023-06-30 14:32:12.827547+00:00
   2023-06-30 16:37:12 [2023-06-30T14:37:12.831+0000] 
{scheduler_job_runner.py:1698} ERROR - Detected zombie job: {'full_filepath': 
'/opt/airflow/dags/dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': 
"{'DAG Id': 'bug-dag', 'Task Id': 'test', 'Run Id': 
'scheduled__2023-06-30T00:00:00+00:00', 'Hostname': 'a39eba268731', 'External 
Executor Id': '5c7744d2-60b2-4b9b-92dc-cd1f4cbb3dad'}", 'simple_task_instance': 
<airflow.models.taskinstance.SimpleTaskInstance object at 0xffff9ff83890>, 
'is_failure_callback': True}
   ```
   The task is then retried successfully, DAGRun state is set to `success` 
accordingly
   ```
   2023-06-30 16:37:19 [2023-06-30T14:37:19.133+0000] 
{scheduler_job_runner.py:625} INFO - Sending TaskInstanceKey(dag_id='bug-dag', 
task_id='test', run_id='scheduled__2023-06-30T00:00:00+00:00', try_number=2, 
map_index=-1) to executor with priority 1 and queue default
   2023-06-30 16:37:36 [2023-06-30T14:37:36.205+0000] {dagrun.py:616} INFO - 
Marking run <DagRun bug-dag @ 2023-06-30 00:00:00+00:00: 
scheduled__2023-06-30T00:00:00+00:00, state:running, queued_at: 2023-06-30 
14:31:06.893551+00:00. externally triggered: False> successful
   2023-06-30 16:37:36 [2023-06-30T14:37:36.208+0000] {dagrun.py:682} INFO - 
DagRun Finished: dag_id=bug-dag, execution_date=2023-06-30 00:00:00+00:00, 
run_id=scheduled__2023-06-30T00:00:00+00:00, run_start_date=2023-06-30 
14:31:06.912695+00:00, run_end_date=2023-06-30 14:37:36.207918+00:00, 
run_duration=389.295223, state=success, external_trigger=False, 
run_type=scheduled, data_interval_start=2023-06-30 00:00:00+00:00, 
data_interval_end=2023-06-30 01:00:00+00:00, 
dag_hash=33653846333689491ef96930c7c745e0
   ```
   Afterwards, the DAG File Processor runs again and executes the zombie 
callback a second time:
   ```
   [2023-06-30T14:37:48.745+0000] {processor.py:157} INFO - Started process 
(PID=1024) to work on /opt/airflow/dags/dag.py
   [2023-06-30T14:37:48.747+0000] {processor.py:826} INFO - Processing file 
/opt/airflow/dags/dag.py for tasks to queue
   [2023-06-30T14:38:03.938+0000] {logging_mixin.py:149} INFO - 
[2023-06-30T14:38:03.937+0000] {taskinstance.py:1826} ERROR - {'DAG Id': 
'bug-dag', 'Task Id': 'test', 'Run Id': 'scheduled__2023-06-30T00:00:00+00:00', 
'Hostname': 'a39eba268731', 'External Executor Id': 
'5c7744d2-60b2-4b9b-92dc-cd1f4cbb3dad'}
   [2023-06-30T14:38:03.962+0000] {logging_mixin.py:149} INFO - 
[2023-06-30T14:38:03.962+0000] {taskinstance.py:1350} INFO - Marking task as 
UP_FOR_RETRY. dag_id=bug-dag, task_id=test, execution_date=20230630T000000, 
start_date=20230630T143735, end_date=20230630T143803
   [2023-06-30T14:38:03.972+0000] {processor.py:787} INFO - Executed failure 
callback for <TaskInstance: bug-dag.test scheduled__2023-06-30T00:00:00+00:00 
[up_for_retry]> in state up_for_retry
   ```
   </details>
   
   ### Operating System
   
   docker-compose on MacOS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to