GitHub user sudarshan2906 added a comment to the discussion: On demand DAG 
parsing

Thanks @potiuk, Airflow 3 looks interesting. 

I still gave it a try in airflow 2.

I did the following things:
* Set AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR to False
* Whenever there is a change in the DAG repo I am running `airflow 
dag-processor -n 1 -S "$dag_path"`

This seems to work for most of the cases that I have tested. However, it's not 
able to detect Zombie tasks. When I intentionally crash the worker, it keeps 
the task in running state forever. 

In scheduler logs, it's detecting the zombie but not marking it as failed:
```
[2025-03-06T09:05:09.359+0000] {scheduler_job_runner.py:2086} WARNING - Failing 
(1) jobs without heartbeat after 2025-03-06 09:00:09.354996+00:00
[2025-03-06T09:05:09.362+0000] {scheduler_job_runner.py:2110} ERROR - Detected 
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py', 
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check', 
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00', 
'Hostname': 'cede351dd5eb', 'External Executor Id': 
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance': 
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1, 
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726, 
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', 
executor=None, executor_config={}, run_as_user=None, pool='default_pool', 
priority_weight=1, queue='default', 
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)), 
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
 e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:09.679+0000] {scheduler_job_runner.py:776} INFO - Received 
executor event with state failed for task instance 
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:09.683+0000] {scheduler_job_runner.py:813} INFO - 
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep, 
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1, 
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None, 
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32), 
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool, 
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06 
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:05:19.542+0000] {scheduler_job_runner.py:2086} WARNING - Failing 
(1) jobs without heartbeat after 2025-03-06 09:00:19.538039+00:00
[2025-03-06T09:05:19.543+0000] {scheduler_job_runner.py:2110} ERROR - Detected 
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py', 
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check', 
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00', 
'Hostname': 'cede351dd5eb', 'External Executor Id': 
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance': 
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1, 
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726, 
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', 
executor=None, executor_config={}, run_as_user=None, pool='default_pool', 
priority_weight=1, queue='default', 
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)), 
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
 e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:20.040+0000] {scheduler_job_runner.py:776} INFO - Received 
executor event with state failed for task instance 
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:20.045+0000] {scheduler_job_runner.py:813} INFO - 
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep, 
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1, 
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None, 
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32), 
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool, 
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06 
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:05:29.685+0000] {scheduler_job_runner.py:2086} WARNING - Failing 
(1) jobs without heartbeat after 2025-03-06 09:00:29.682020+00:00
[2025-03-06T09:05:29.687+0000] {scheduler_job_runner.py:2110} ERROR - Detected 
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py', 
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check', 
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00', 
'Hostname': 'cede351dd5eb', 'External Executor Id': 
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance': 
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1, 
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726, 
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', 
executor=None, executor_config={}, run_as_user=None, pool='default_pool', 
priority_weight=1, queue='default', 
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)), 
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
 e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:30.353+0000] {scheduler_job_runner.py:776} INFO - Received 
executor event with state failed for task instance 
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:30.357+0000] {scheduler_job_runner.py:813} INFO - 
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep, 
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1, 
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None, 
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32), 
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool, 
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06 
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:05:39.739+0000] {scheduler_job_runner.py:2086} WARNING - Failing 
(1) jobs without heartbeat after 2025-03-06 09:00:39.736792+00:00
[2025-03-06T09:05:39.740+0000] {scheduler_job_runner.py:2110} ERROR - Detected 
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py', 
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check', 
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00', 
'Hostname': 'cede351dd5eb', 'External Executor Id': 
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance': 
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1, 
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726, 
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', 
executor=None, executor_config={}, run_as_user=None, pool='default_pool', 
priority_weight=1, queue='default', 
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)), 
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
 e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:40.644+0000] {scheduler_job_runner.py:776} INFO - Received 
executor event with state failed for task instance 
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:40.648+0000] {scheduler_job_runner.py:813} INFO - 
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep, 
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1, 
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None, 
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32), 
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool, 
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06 
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:05:49.884+0000] {scheduler_job_runner.py:2086} WARNING - Failing 
(1) jobs without heartbeat after 2025-03-06 09:00:49.881131+00:00
[2025-03-06T09:05:49.886+0000] {scheduler_job_runner.py:2110} ERROR - Detected 
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py', 
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check', 
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00', 
'Hostname': 'cede351dd5eb', 'External Executor Id': 
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance': 
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1, 
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726, 
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', 
executor=None, executor_config={}, run_as_user=None, pool='default_pool', 
priority_weight=1, queue='default', 
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)), 
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
 e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:50.985+0000] {scheduler_job_runner.py:776} INFO - Received 
executor event with state failed for task instance 
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:50.991+0000] {scheduler_job_runner.py:813} INFO - 
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep, 
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1, 
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None, 
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32), 
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool, 
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06 
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:06:00.047+0000] {scheduler_job_runner.py:2086} WARNING - Failing 
(1) jobs without heartbeat after 2025-03-06 09:01:00.043111+00:00
[2025-03-06T09:06:00.049+0000] {scheduler_job_runner.py:2110} ERROR - Detected 
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py', 
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check', 
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00', 
'Hostname': 'cede351dd5eb', 'External Executor Id': 
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance': 
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1, 
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726, 
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running', 
executor=None, executor_config={}, run_as_user=None, pool='default_pool', 
priority_weight=1, queue='default', 
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep', 
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)), 
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
 e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
```


GitHub link: 
https://github.com/apache/airflow/discussions/47395#discussioncomment-12411787

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to