MichailKaramanos opened a new issue, #34624:
URL: https://github.com/apache/airflow/issues/34624
### Apache Airflow version
2.7.1
### What happened
In our company, when we started taking leverage of the new triggerer
process, in a specific Airflow instance, we started to have constant tasks
stuck in queued state at random intervals almost every day.
The only difference between this specific instance and the other ones, is
that this one has a DAG that `max_active_tis_per_dag=1` in a configured in a
`deferrable` enabled task to avoid having multiple runs of the same task
instance along all dag runs.
- The first strange behaviour that we noticed is that this configuration was
not being respected and, when some delay was introduced between dag runs,
multiple tasks were spawned and deferred.
- The second behaviour that we noticed was that the tasks started getting
stuck in queued state almost every day at a random interval
- The third behaviour was that our alarmistic started bouncing between tasks
being QUEUED <-> SCHEDULED state constantly
- The last but not least was that the scheduler logs started looping and
indicating that the concurrency limit was reached, such as: `Not executing
<TaskInstance: ... [scheduled]> since the task concurrency for this task has
been reached.`
### What you think should happen instead
Scheduler logs started being flooded with the following:
```
[2023-09-25T17:07:23.698+0000] {scheduler_job_runner.py:527} INFO - Not
executing <TaskInstance: ... scheduled__2023-09-25T08:00:00+00:00 [scheduled]>
since the task concurrency for this task has been reached.
[2023-09-25T17:07:23.698+0000] {scheduler_job_runner.py:479} INFO - DAG ...
has 31/180 running and queued tasks
```
### How to reproduce
A DAG with following characteristics:
- `max_active_runs=4`
- an hourly `schedule_interval`
- A deferable task with `max_active_tis_per_dag=1`
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
AKS
### Anything else
After digging the code, it seems that the following is happening:
- Tasks EXECUTION_STATES does not contemplate the task DEFERRED state:
https://github.com/apache/airflow/blob/main/airflow/ti_deps/dependencies_states.py#L21
- On the other hand, when using pools, this state was contemplated by
concatenating the task EXECUTION_STATES:
https://github.com/apache/airflow/blob/main/airflow/models/pool.py#L184
- This structure is needed to apply the concurrency limits here:
https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L342
- The previous `concurrency_map` gets filled with deferred tasks that
already completed but are treated as starved tasks refusing to execute any more
tasks, despite the deferred tasks were already completed:
https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L530
To sum up, the fix would be to add the DEFERRED state to the
EXECUTION_STATES structure:
```
EXECUTION_STATES = {
TaskInstanceState.RUNNING,
TaskInstanceState.QUEUED,
TaskInstanceState.DEFERRED,
}
```
to have the similar behaviour of pools.
On the other hand, the pool's logic should have the same pattern, feed from
that EXECUTION_STATES structure instead of making a concatenation:
```
allowed_execution_states = EXECUTION_STATES | {
TaskInstanceState.DEFERRED,
}
```
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]