mewa opened a new issue, #43644:
URL: https://github.com/apache/airflow/issues/43644
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.8.3
### What happened?
I'm creating a new issue for this since we've reproduced this on a newer
version than the one mentioned in #21842 @potiuk.
We have the same setup as in the issue above:
* we use KubernetesExecutor with AWS spot instances
* all tasks have their retries set to at least 1
What happens is that when a spot instance is terminated the task is killed
(by Kubernetes) – but Airflow scheduler never tries to reschedule it.
In this particular case, the pod didn't manage to start before the node
initiated shutdown.
Here's an excerpt from the logs:
```
[2024-11-04
01:17:32,249][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Received executor event with state failed for task instance
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
[2024-11-04
01:17:28,807][PID:7][ERROR][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Executor reports task instance <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the
task says it's queued. (Info: None) Was the task killed externally?
[2024-11-04T01:17:28.783+0000] {kubernetes_executor_utils.py:269} ERROR -
Event: <pod_name> Failed, annotations: <omitted>
```
And here are related logs over the lifespan of the task:
```
[2024-11-04
01:17:32,269][PID:7][ERROR][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Executor reports task instance <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the
task says it's queued. (Info: None) Was the task killed externally?
[2024-11-04T01:17:32.269+0000] {task_context_logger.py:91} ERROR - Executor
reports task instance <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the
task says it's queued. (Info: None) Was the task killed externally?
[2024-11-04
01:17:32,268][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
TaskInstance Finished: dag_id=<dag_id>, task_id=<task_id>,
run_id=scheduled__2024-11-03T00:00:00+00:00, map_index=-1, run_start_date=None,
run_end_date=None, run_duration=None, state=queued, executor_state=failed,
try_number=1, max_tries=1, job_id=None, pool=small_tasks, queue=default,
priority_weight=12, operator=S17PythonOperator, queued_dttm=2024-11-04
01:17:13.321215+00:00, queued_by_job_id=2782000, pid=None
[2024-11-04T01:17:32.268+0000] {scheduler_job_runner.py:733} INFO -
TaskInstance Finished: dag_id=<dag_id>, task_id=<task_id>,
run_id=scheduled__2024-11-03T00:00:00+00:00, map_index=-1, run_start_date=None,
run_end_date=None, run_duration=None, state=queued, executor_state=failed,
try_number=1, max_tries=1, job_id=None, pool=small_tasks, queue=default,
priority_weight=12, operator=S17PythonOperator, queued_dttm=2024-11-04
01:17:13.321215+00:00, queued_by_job_id=2782000, pid=None
[2024-11-04
01:17:32,249][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Received executor event with state failed for task instance
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
[2024-11-04T01:17:32.249+0000] {scheduler_job_runner.py:696} INFO - Received
executor event with state failed for task instance
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
[2024-11-04
01:17:31,405][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
Changing state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1),
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610440') to
failed
[2024-11-04T01:17:31.405+0000] {kubernetes_executor.py:401} INFO - Changing
state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1),
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610440') to
failed
[2024-11-04
01:17:31,404][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
Changing state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1),
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610439') to
failed
[2024-11-04T01:17:31.404+0000] {kubernetes_executor.py:401} INFO - Changing
state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1),
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610439') to
failed
[2024-11-04
01:17:31,403][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
Changing state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1),
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610438') to
failed
[2024-11-04T01:17:31.403+0000] {kubernetes_executor.py:401} INFO - Changing
state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1),
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610438') to
failed
[2024-11-04
01:17:28,807][PID:7][ERROR][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Executor reports task instance <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the
task says it's queued. (Info: None) Was the task killed externally?
[2024-11-04T01:17:28.807+0000] {task_context_logger.py:91} ERROR - Executor
reports task instance <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the
task says it's queued. (Info: None) Was the task killed externally?
[2024-11-04
01:17:28,806][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
TaskInstance Finished: dag_id=<dag_id>, task_id=<task_id>,
run_id=scheduled__2024-11-03T00:00:00+00:00, map_index=-1, run_start_date=None,
run_end_date=None, run_duration=None, state=queued, executor_state=failed,
try_number=1, max_tries=1, job_id=None, pool=small_tasks, queue=default,
priority_weight=12, operator=S17PythonOperator, queued_dttm=2024-11-04
01:17:13.321215+00:00, queued_by_job_id=2782000, pid=None
[2024-11-04T01:17:28.806+0000] {scheduler_job_runner.py:733} INFO -
TaskInstance Finished: dag_id=<dag_id>, task_id=<task_id>,
run_id=scheduled__2024-11-03T00:00:00+00:00, map_index=-1, run_start_date=None,
run_end_date=None, run_duration=None, state=queued, executor_state=failed,
try_number=1, max_tries=1, job_id=None, pool=small_tasks, queue=default,
priority_weight=12, operator=S17PythonOperator, queued_dttm=2024-11-04
01:17:13.321215+00:00, queued_by_job_id=2782000, pid=None
[2024-11-04
01:17:28,790][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Received executor event with state failed for task instance
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
[2024-11-04T01:17:28.790+0000] {scheduler_job_runner.py:696} INFO - Received
executor event with state failed for task instance
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
[2024-11-04T01:17:28.783+0000] {kubernetes_executor_utils.py:269} ERROR -
Event: <pod_name> Failed, annotations: <omitted>
[2024-11-04
01:17:27,748][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
Changing state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1),
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610402') to
failed
[2024-11-04T01:17:27.748+0000] {kubernetes_executor.py:401} INFO - Changing
state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1),
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610402') to
failed
[2024-11-04
01:17:22,381][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler]
Creating kubernetes pod for job is TaskInstanceKey(dag_id='<dag_id>',
task_id='<task_id>', run_id='scheduled__2024-11-03T00:00:00+00:00',
try_number=1, map_index=-1), with pod name <pod_name>, annotations: <omitted>
[2024-11-04T01:17:22.381+0000] {kubernetes_executor_utils.py:431} INFO -
Creating kubernetes pod for job is TaskInstanceKey(dag_id='<dag_id>',
task_id='<task_id>', run_id='scheduled__2024-11-03T00:00:00+00:00',
try_number=1, map_index=-1), with pod name <pod_name>, annotations: <omitted>
[2024-11-04
01:17:14,893][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Setting external_id for <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [queued]> to 2782000
[2024-11-04T01:17:14.893+0000] {scheduler_job_runner.py:723} INFO - Setting
external_id for <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [queued]> to 2782000
[2024-11-04
01:17:14,877][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Received executor event with state queued for task instance
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
[2024-11-04T01:17:14.877+0000] {scheduler_job_runner.py:696} INFO - Received
executor event with state queued for task instance
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
[2024-11-04
01:17:13,334][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
Add task TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1) with
command ['airflow', 'tasks', 'run', '<dag_id>', '<task_id>',
'scheduled__2024-11-03T00:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/pai_non_renewable_energy-v1_dag.zip/pai_non_renewable_energy_dag.py']
[2024-11-04T01:17:13.334+0000] {kubernetes_executor.py:357} INFO - Add task
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1) with
command ['airflow', 'tasks', 'run', '<dag_id>', '<task_id>',
'scheduled__2024-11-03T00:00:00+00:00', '--local', '--subdir', '<dags_dir>']
[2024-11-04
01:17:13,327][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
Adding to queue: ['airflow', 'tasks', 'run', '<dag_id>', '<task_id>',
'scheduled__2024-11-03T00:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/pai_non_renewable_energy-v1_dag.zip/pai_non_renewable_energy_dag.py']
[2024-11-04T01:17:13.327+0000] {base_executor.py:146} INFO - Adding to
queue: ['airflow', 'tasks', 'run', '<dag_id>', '<task_id>',
'scheduled__2024-11-03T00:00:00+00:00', '--local', '--subdir',
'DAGS_FOLDER/pai_non_renewable_energy-v1_dag.zip/pai_non_renewable_energy_dag.py']
[2024-11-04
01:17:13,326][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
Sending TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1) to
executor with priority 12 and queue default
[2024-11-04T01:17:13.326+0000] {scheduler_job_runner.py:646} INFO - Sending
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>',
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1) to
executor with priority 12 and queue default
[2024-11-04T01:17:13.353Z] <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [scheduled]>
[2024-11-04T01:17:13.353Z], <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [scheduled]>
[2024-11-04T01:17:13.352Z], <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [scheduled]>
[2024-11-04T01:17:13.352Z], <TaskInstance: <dag_id>.<task_id>
scheduled__2024-11-03T00:00:00+00:00 [scheduled]>
```
### What you think should happen instead?
Airflow scheduler should retry the task per specified retry policy rather
than mark it as failed.
### How to reproduce
Terminate the node containing the pod.
### Operating System
`apache/airflow:2.8.3-python3.10` docker image
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]