wolfier commented on issue #23824:
URL: https://github.com/apache/airflow/issues/23824#issuecomment-1145346608
I was able to replicate this issue with the following DAGs on my local
machine with LocalExceutor.
```python
from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync
from datetime import datetime, timedelta
for x in range(10):
_dag_id = f"sense_dag_{x}"
task_list = []
with DAG(
dag_id=_dag_id,
schedule_interval="*/1 * * * *",
start_date=datetime(2022, 1, 1),
max_active_runs=1,
# catchup=False,
) as dag:
for y in range(10):
task_list.append(
TimeDeltaSensorAsync(
task_id=f"sense_{y}",
delta=timedelta(seconds=0),
retries=3,
retry_delay=timedelta(seconds=5)
)
)
globals()[_dag_id] = dag
```
This issue is made more clear when there are multiple retries because the
task log that indicate a task cannot run is recorded in the next try as the try
number is increased. Another clear indicator is the output from the
scheduler_job.py where the task instance state is checked to be completed (not
queued).
```
[2022-05-27 16:55:43,906] {scheduler_job.py:669} ERROR - Executor reports
task instance <TaskInstance:
telescope_processor.live_nation.historic_sense_live_nation_gcs_objects
scheduled__2022-05-03T00:00:00+00:00 [queued]> finished (success) although the
task says its queued. (Info: None) Was the task killed externally?
```
I believe the purpose of the check is to ensure that the task is not
executed again before the scheduler acknowledges the event sent by the
executor, which is only sent when a task execution completes whether it
succeeded or failed. The check is to prevent tasks getting stuck in the queued
state. This behaviour is outlined in AIRFLOW-1641 and fixed in #2715.
With the introduction of Triggerers, there is a scenario where task aren't
necessarily stuck in queued but placed in queued because it was moved to the
scheduled state post completion of the trigger instance.
This is roughly how the task instance state flows for deferred tasks.
1. Celery execution of the Task instance completes with celery state success
([source](https://github.com/apache/airflow/blob/2.3.0/airflow/executors/celery_executor.py#L392-L393)).
2. Trigger instance assigned with triggerer
([source](https://github.com/apache/airflow/blob/2.3.0/airflow/models/trigger.py#L186-L194)).
3. Trigger Instance completes and task instance state is set to scheduled
([source](https://github.com/apache/airflow/blob/2.3.0/airflow/models/trigger.py#L109-L124)).
4. Task instance key removed from the running set and executor event added
to the event buffer with task state success
([source](https://github.com/apache/airflow/blob/2.3.0/airflow/executors/base_executor.py#L261))
because the celery execution returned with celery state success.
5. Scheduler processes executor event of the deferred task instance (now in
the scheduled task state from step 3) with the success task state (from step 4).
6. Task instance follow task life cycle as normal from scheduled -> queued
7. Task is executed in worker
However, given how long it takes for the scheduler to get to processing
executor events in the scheduler loop, step 5 and 6 could be flipped. This
means that the task instance is in the queued state when the scheduler
processes the executor event of the deferred task instance. Since the state is
queued, the condition is met; therefore the task is marked as failed / up for
retry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]