vilozio opened a new issue, #26773:
URL: https://github.com/apache/airflow/issues/26773

   ### Apache Airflow version
   
   Other Airflow 2 version
   
   ### What happened
   
   We have Airflow (2.2.0) deployed to Kubernetes (1.21.14-gke.700) with the 
official Airflow Helm Chart (1.6.0). Database is PostgreSQL.
   
   We use KubernetesExecutor.
   
   Recently we upgraded Airflow to the recent (at that point of time) version 
2.3.4 (I was waiting for this version because of the fix 
https://github.com/apache/airflow/pull/24356).
   
   Initially the upgrade went fine, but then we noticed that DAGs just stops 
running new tasks. The tasks are stuck in queued state. Actions like clear, 
delete of a run or manual run doesn't help. Only restart of Airflow scheduler 
helps, but then after one day (we have only daily jobs) tasks are stuck again.
   
   After restart of scheduler, the next run of tasks finishes successfully, but 
on the 3rd day they are stuck again.
   
   I have logs and some ideas why it could happen, I will write it below.
   
   ### What you think should happen instead
   
   Tasks run successfully on the scheduled time.
   
   ### How to reproduce
   
   Upgrade Airflow 2.2.0 with DAGs to the version 2.3.4.
   
   ### Operating System
   
   Windows 11
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
    - Airflow 2.2.0 
    - Official Airflow Helm Chart 1.6.0
    - Kubernetes 1.21.14-gke.700 
    - PostgreSQL 13
    - executor: KubernetesExecutor
   
   ### Anything else
   
   This problem occurs every 3rd day. We downgraded the Airflow back to 2.2.0 
until we find out the reason.
   
   In logs the scheduler writes something like this (left mention of only one 
dag task, there are other 27 dag tasks)
   ```
   [2022-09-14T08:01:30.656+0000] {dagbag.py:472} DEBUG - Loaded DAG <DAG: 
MY_DAG_1>
   [2022-09-14T08:01:30.746+0000] {processor.py:650} INFO - DAG(s) 
dict_keys([MY_DAG_1']) retrieved from 
/opt/airflow/dags/create_dags_from_configs.py
   [2022-09-14T08:01:30.747+0000] {dagbag.py:619} DEBUG - Running 
dagbag.sync_to_db with retries. Try 1 of 3
   [2022-09-14T08:01:30.747+0000] {dagbag.py:624} DEBUG - Calling the 
DAG.bulk_sync_to_db method
   [2022-09-14T08:01:30.966+0000] {scheduler_job.py:354} INFO - 28 tasks up for 
execution:
    <TaskInstance: MY_DAG_1.main scheduled__2022-09-13T04:00:00+00:00 
[scheduled]>
    ...
   [2022-09-14T08:01:31.037+0000] {scheduler_job.py:547} INFO - Sending 
TaskInstanceKey(dag_id='MY_DAG_1', task_id='main', 
run_id='scheduled__2022-09-13T04:00:00+00:00', try_number=1, map_index=-1) to 
executor with priority 1 and queue default
   [2022-09-14T08:01:31.038+0000] {base_executor.py:91} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'MY_DAG_1', 'main', 
'scheduled__2022-09-13T04:00:00+00:00', '--local', '--subdir', 
'DAGS_FOLDER/create_dags_from_configs.py']
   ...
   [2022-09-14T08:01:31.054+0000] {base_executor.py:159} DEBUG - 28 running 
task instances
   [2022-09-14T08:01:31.055+0000] {base_executor.py:160} DEBUG - 28 in queue
   [2022-09-14T08:01:31.055+0000] {base_executor.py:161} DEBUG - 36 open slots
   ```
   
   Then for every DAG task there appears these error messages in logs
   ```
   [2022-09-12T16:32:45.654+0000] {kubernetes_executor.py:469} INFO - Found 2 
queued task instances
   [2022-09-12T16:32:45.673+0000] {kubernetes_executor.py:510} INFO - 
TaskInstance: <TaskInstance: MY_DAG_1.main scheduled__2022-08-03T07:00:00+00:00 
[queued]> found in queued state but was not launched, rescheduling
   [2022-09-12T16:32:46.336+0000] {scheduler_job.py:419} INFO - DAG MY_DAG_1 
has 1/16 running and queued tasks
   [2022-09-12T16:32:46.337+0000] {scheduler_job.py:505} INFO - Setting the 
following tasks to queued state:
   <TaskInstance: MY_DAG_1.main scheduled__2022-08-03T07:00:00+00:00 
[scheduled]>
   [2022-09-12T16:32:49.649+0000] {base_executor.py:215} ERROR - could not 
queue task TaskInstanceKey(dag_id='MY_DAG_1', task_id='main', 
run_id='scheduled__2022-08-03T07:00:00+00:00', try_number=1, map_index=-1) 
(still running after 4 attempts)
   ```
   
   This repeats until I restart the scheduler and delete all tasks to 
reschedule them.
   
   At first I thought that this situation is related to 
https://github.com/apache/airflow/issues/21225, but in the issue Celery is used 
instead of KubernetesExecutor.
   
   I also have a suspicion this error may be caused by the changes in 
https://github.com/apache/airflow/pull/23016 as there the trigger logic was 
changed for Celery, but not for Kubernetes Executor.
   
   All the symptoms are similar to the issue 
https://github.com/apache/airflow/issues/25728. It seems that Task Key was 
never removed from self.running after it initially rescheduled itself. This 
also happens in our case with KubernetesExecutor.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to