GitHub user sudarshan2906 added a comment to the discussion: On demand DAG
parsing
Thanks @potiuk, Airflow 3 looks interesting.
I still gave it a try in airflow 2.
I did the following things:
* Set AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR to False
* Whenever there is a change in the DAG repo I am running `airflow
dag-processor -n 1 -S "$dag_path"`
This seems to work for most of the cases that I have tested. However, it's not
able to detect Zombie tasks. When I intentionally crash the worker, it keeps
the task in running state forever.
In scheduler logs, it's detecting the zombie but not marking it as failed:
```
[2025-03-06T09:05:09.359+0000] {scheduler_job_runner.py:2086} WARNING - Failing
(1) jobs without heartbeat after 2025-03-06 09:00:09.354996+00:00
[2025-03-06T09:05:09.362+0000] {scheduler_job_runner.py:2110} ERROR - Detected
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py',
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check',
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00',
'Hostname': 'cede351dd5eb', 'External Executor Id':
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance':
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1,
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726,
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running',
executor=None, executor_config={}, run_as_user=None, pool='default_pool',
priority_weight=1, queue='default',
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)),
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:09.679+0000] {scheduler_job_runner.py:776} INFO - Received
executor event with state failed for task instance
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:09.683+0000] {scheduler_job_runner.py:813} INFO -
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep,
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1,
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None,
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32),
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool,
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:05:19.542+0000] {scheduler_job_runner.py:2086} WARNING - Failing
(1) jobs without heartbeat after 2025-03-06 09:00:19.538039+00:00
[2025-03-06T09:05:19.543+0000] {scheduler_job_runner.py:2110} ERROR - Detected
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py',
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check',
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00',
'Hostname': 'cede351dd5eb', 'External Executor Id':
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance':
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1,
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726,
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running',
executor=None, executor_config={}, run_as_user=None, pool='default_pool',
priority_weight=1, queue='default',
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)),
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:20.040+0000] {scheduler_job_runner.py:776} INFO - Received
executor event with state failed for task instance
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:20.045+0000] {scheduler_job_runner.py:813} INFO -
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep,
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1,
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None,
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32),
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool,
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:05:29.685+0000] {scheduler_job_runner.py:2086} WARNING - Failing
(1) jobs without heartbeat after 2025-03-06 09:00:29.682020+00:00
[2025-03-06T09:05:29.687+0000] {scheduler_job_runner.py:2110} ERROR - Detected
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py',
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check',
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00',
'Hostname': 'cede351dd5eb', 'External Executor Id':
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance':
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1,
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726,
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running',
executor=None, executor_config={}, run_as_user=None, pool='default_pool',
priority_weight=1, queue='default',
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)),
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:30.353+0000] {scheduler_job_runner.py:776} INFO - Received
executor event with state failed for task instance
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:30.357+0000] {scheduler_job_runner.py:813} INFO -
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep,
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1,
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None,
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32),
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool,
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:05:39.739+0000] {scheduler_job_runner.py:2086} WARNING - Failing
(1) jobs without heartbeat after 2025-03-06 09:00:39.736792+00:00
[2025-03-06T09:05:39.740+0000] {scheduler_job_runner.py:2110} ERROR - Detected
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py',
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check',
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00',
'Hostname': 'cede351dd5eb', 'External Executor Id':
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance':
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1,
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726,
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running',
executor=None, executor_config={}, run_as_user=None, pool='default_pool',
priority_weight=1, queue='default',
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)),
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:40.644+0000] {scheduler_job_runner.py:776} INFO - Received
executor event with state failed for task instance
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:40.648+0000] {scheduler_job_runner.py:813} INFO -
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep,
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1,
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None,
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32),
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool,
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:05:49.884+0000] {scheduler_job_runner.py:2086} WARNING - Failing
(1) jobs without heartbeat after 2025-03-06 09:00:49.881131+00:00
[2025-03-06T09:05:49.886+0000] {scheduler_job_runner.py:2110} ERROR - Detected
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py',
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check',
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00',
'Hostname': 'cede351dd5eb', 'External Executor Id':
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance':
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1,
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726,
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running',
executor=None, executor_config={}, run_as_user=None, pool='default_pool',
priority_weight=1, queue='default',
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)),
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
[2025-03-06T09:05:50.985+0000] {scheduler_job_runner.py:776} INFO - Received
executor event with state failed for task instance
TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)
[2025-03-06T09:05:50.991+0000] {scheduler_job_runner.py:813} INFO -
TaskInstance Finished: dag_id=scheduler_check, task_id=sleep,
run_id=scheduled__2025-03-06T08:58:00+00:00, map_index=-1,
run_start_date=2025-03-06 08:59:24.382726+00:00, run_end_date=None,
run_duration=None, state=running, executor=CeleryExecutor(parallelism=32),
executor_state=failed, try_number=1, max_tries=0, job_id=9, pool=default_pool,
queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2025-03-06
08:59:23.285182+00:00, queued_by_job_id=2, pid=129
[2025-03-06T09:06:00.047+0000] {scheduler_job_runner.py:2086} WARNING - Failing
(1) jobs without heartbeat after 2025-03-06 09:01:00.043111+00:00
[2025-03-06T09:06:00.049+0000] {scheduler_job_runner.py:2110} ERROR - Detected
zombie job: {'full_filepath': '/opt/airflow/dags/ipa/scheduler_check_dag.py',
'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'scheduler_check',
'Task Id': 'sleep', 'Run Id': 'scheduled__2025-03-06T08:58:00+00:00',
'Hostname': 'cede351dd5eb', 'External Executor Id':
'fa2e8825-893c-4f23-a6db-97721cefeec0'}", 'simple_task_instance':
SimpleTaskInstance(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', map_index=-1,
start_date=datetime.datetime(2025, 3, 6, 8, 59, 24, 382726,
tzinfo=Timezone('UTC')), end_date=None, try_number=1, state='running',
executor=None, executor_config={}, run_as_user=None, pool='default_pool',
priority_weight=1, queue='default',
key=TaskInstanceKey(dag_id='scheduler_check', task_id='sleep',
run_id='scheduled__2025-03-06T08:58:00+00:00', try_number=1, map_index=-1)),
'task_callback_type': None} (See https://airflow.apache.org/docs/apach
e-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
```
GitHub link:
https://github.com/apache/airflow/discussions/47395#discussioncomment-12411787
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]