mewa opened a new issue, #43644:
URL: https://github.com/apache/airflow/issues/43644

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.8.3
   
   ### What happened?
   
   I'm creating a new issue for this since we've reproduced this on a newer 
version than the one mentioned in #21842 @potiuk.
   
   We have the same setup as in the issue above:
   * we use KubernetesExecutor with AWS spot instances
   * all tasks have their retries set to at least 1
   
   What happens is that when a spot instance is terminated the task is killed 
(by Kubernetes) – but Airflow scheduler never tries to reschedule it.
   
   In this particular case, the pod didn't manage to start before the node 
initiated shutdown.
   
   Here's an excerpt from the logs:
   
   ```
   [2024-11-04 
01:17:32,249][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Received executor event with state failed for task instance 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
   [2024-11-04 
01:17:28,807][PID:7][ERROR][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Executor reports task instance <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the 
task says it's queued. (Info: None) Was the task killed externally?
   [2024-11-04T01:17:28.783+0000] {kubernetes_executor_utils.py:269} ERROR - 
Event: <pod_name> Failed, annotations: <omitted>
   ```
   
   And here are related logs over the lifespan of the task:
   
   ```
   [2024-11-04 
01:17:32,269][PID:7][ERROR][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Executor reports task instance <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the 
task says it's queued. (Info: None) Was the task killed externally?
   [2024-11-04T01:17:32.269+0000] {task_context_logger.py:91} ERROR - Executor 
reports task instance <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the 
task says it's queued. (Info: None) Was the task killed externally?
   [2024-11-04 
01:17:32,268][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 TaskInstance Finished: dag_id=<dag_id>, task_id=<task_id>, 
run_id=scheduled__2024-11-03T00:00:00+00:00, map_index=-1, run_start_date=None, 
run_end_date=None, run_duration=None, state=queued, executor_state=failed, 
try_number=1, max_tries=1, job_id=None, pool=small_tasks, queue=default, 
priority_weight=12, operator=S17PythonOperator, queued_dttm=2024-11-04 
01:17:13.321215+00:00, queued_by_job_id=2782000, pid=None
   [2024-11-04T01:17:32.268+0000] {scheduler_job_runner.py:733} INFO - 
TaskInstance Finished: dag_id=<dag_id>, task_id=<task_id>, 
run_id=scheduled__2024-11-03T00:00:00+00:00, map_index=-1, run_start_date=None, 
run_end_date=None, run_duration=None, state=queued, executor_state=failed, 
try_number=1, max_tries=1, job_id=None, pool=small_tasks, queue=default, 
priority_weight=12, operator=S17PythonOperator, queued_dttm=2024-11-04 
01:17:13.321215+00:00, queued_by_job_id=2782000, pid=None
   [2024-11-04 
01:17:32,249][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Received executor event with state failed for task instance 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
   [2024-11-04T01:17:32.249+0000] {scheduler_job_runner.py:696} INFO - Received 
executor event with state failed for task instance 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
   [2024-11-04 
01:17:31,405][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 Changing state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610440') to 
failed
   [2024-11-04T01:17:31.405+0000] {kubernetes_executor.py:401} INFO - Changing 
state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610440') to 
failed
   [2024-11-04 
01:17:31,404][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 Changing state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610439') to 
failed
   [2024-11-04T01:17:31.404+0000] {kubernetes_executor.py:401} INFO - Changing 
state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610439') to 
failed
   [2024-11-04 
01:17:31,403][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 Changing state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610438') to 
failed
   [2024-11-04T01:17:31.403+0000] {kubernetes_executor.py:401} INFO - Changing 
state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610438') to 
failed
   [2024-11-04 
01:17:28,807][PID:7][ERROR][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Executor reports task instance <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the 
task says it's queued. (Info: None) Was the task killed externally?
   [2024-11-04T01:17:28.807+0000] {task_context_logger.py:91} ERROR - Executor 
reports task instance <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [queued]> finished (failed) although the 
task says it's queued. (Info: None) Was the task killed externally?
   [2024-11-04 
01:17:28,806][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 TaskInstance Finished: dag_id=<dag_id>, task_id=<task_id>, 
run_id=scheduled__2024-11-03T00:00:00+00:00, map_index=-1, run_start_date=None, 
run_end_date=None, run_duration=None, state=queued, executor_state=failed, 
try_number=1, max_tries=1, job_id=None, pool=small_tasks, queue=default, 
priority_weight=12, operator=S17PythonOperator, queued_dttm=2024-11-04 
01:17:13.321215+00:00, queued_by_job_id=2782000, pid=None
   [2024-11-04T01:17:28.806+0000] {scheduler_job_runner.py:733} INFO - 
TaskInstance Finished: dag_id=<dag_id>, task_id=<task_id>, 
run_id=scheduled__2024-11-03T00:00:00+00:00, map_index=-1, run_start_date=None, 
run_end_date=None, run_duration=None, state=queued, executor_state=failed, 
try_number=1, max_tries=1, job_id=None, pool=small_tasks, queue=default, 
priority_weight=12, operator=S17PythonOperator, queued_dttm=2024-11-04 
01:17:13.321215+00:00, queued_by_job_id=2782000, pid=None
   [2024-11-04 
01:17:28,790][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Received executor event with state failed for task instance 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
   [2024-11-04T01:17:28.790+0000] {scheduler_job_runner.py:696} INFO - Received 
executor event with state failed for task instance 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
   [2024-11-04T01:17:28.783+0000] {kubernetes_executor_utils.py:269} ERROR - 
Event: <pod_name> Failed, annotations: <omitted>
   [2024-11-04 
01:17:27,748][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 Changing state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610402') to 
failed
   [2024-11-04T01:17:27.748+0000] {kubernetes_executor.py:401} INFO - Changing 
state of (TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1), 
<TaskInstanceState.FAILED: 'failed'>, '<pod_name>', 'airflow', '649610402') to 
failed
   [2024-11-04 
01:17:22,381][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler]
 Creating kubernetes pod for job is TaskInstanceKey(dag_id='<dag_id>', 
task_id='<task_id>', run_id='scheduled__2024-11-03T00:00:00+00:00', 
try_number=1, map_index=-1), with pod name <pod_name>, annotations: <omitted>
   [2024-11-04T01:17:22.381+0000] {kubernetes_executor_utils.py:431} INFO - 
Creating kubernetes pod for job is TaskInstanceKey(dag_id='<dag_id>', 
task_id='<task_id>', run_id='scheduled__2024-11-03T00:00:00+00:00', 
try_number=1, map_index=-1), with pod name <pod_name>, annotations: <omitted>
   [2024-11-04 
01:17:14,893][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Setting external_id for <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [queued]> to 2782000
   [2024-11-04T01:17:14.893+0000] {scheduler_job_runner.py:723} INFO - Setting 
external_id for <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [queued]> to 2782000
   [2024-11-04 
01:17:14,877][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Received executor event with state queued for task instance 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
   [2024-11-04T01:17:14.877+0000] {scheduler_job_runner.py:696} INFO - Received 
executor event with state queued for task instance 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1)
   [2024-11-04 
01:17:13,334][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 Add task TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1) with 
command ['airflow', 'tasks', 'run', '<dag_id>', '<task_id>', 
'scheduled__2024-11-03T00:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/pai_non_renewable_energy-v1_dag.zip/pai_non_renewable_energy_dag.py']
   [2024-11-04T01:17:13.334+0000] {kubernetes_executor.py:357} INFO - Add task 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1) with 
command ['airflow', 'tasks', 'run', '<dag_id>', '<task_id>', 
'scheduled__2024-11-03T00:00:00+00:00', '--local', '--subdir', '<dags_dir>']
   [2024-11-04 
01:17:13,327][PID:7][INFO][airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 Adding to queue: ['airflow', 'tasks', 'run', '<dag_id>', '<task_id>', 
'scheduled__2024-11-03T00:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/pai_non_renewable_energy-v1_dag.zip/pai_non_renewable_energy_dag.py']
   [2024-11-04T01:17:13.327+0000] {base_executor.py:146} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', '<dag_id>', '<task_id>', 
'scheduled__2024-11-03T00:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/pai_non_renewable_energy-v1_dag.zip/pai_non_renewable_energy_dag.py']
   [2024-11-04 
01:17:13,326][PID:7][INFO][airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
 Sending TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1) to 
executor with priority 12 and queue default
   [2024-11-04T01:17:13.326+0000] {scheduler_job_runner.py:646} INFO - Sending 
TaskInstanceKey(dag_id='<dag_id>', task_id='<task_id>', 
run_id='scheduled__2024-11-03T00:00:00+00:00', try_number=1, map_index=-1) to 
executor with priority 12 and queue default
   [2024-11-04T01:17:13.353Z] <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [scheduled]>
   [2024-11-04T01:17:13.353Z], <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [scheduled]>
   [2024-11-04T01:17:13.352Z], <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [scheduled]>
   [2024-11-04T01:17:13.352Z], <TaskInstance: <dag_id>.<task_id> 
scheduled__2024-11-03T00:00:00+00:00 [scheduled]>
   ```
   
   ### What you think should happen instead?
   
   Airflow scheduler should retry the task per specified retry policy rather 
than mark it as failed.
   
   ### How to reproduce
   
   Terminate the node containing the pod.
   
   ### Operating System
   
   `apache/airflow:2.8.3-python3.10` docker image
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to