wolfier commented on issue #23824:
URL: https://github.com/apache/airflow/issues/23824#issuecomment-1145346608

   I was able to replicate this issue with the following DAGs on my local 
machine with LocalExceutor.
   
   ```python
   from airflow import DAG
   from airflow.sensors.time_delta import TimeDeltaSensorAsync
   from datetime import datetime, timedelta
   
   
   for x in range(10):
       _dag_id = f"sense_dag_{x}"
       task_list = []
       with DAG(
           dag_id=_dag_id,
           schedule_interval="*/1 * * * *",
           start_date=datetime(2022, 1, 1),
           max_active_runs=1,
           # catchup=False,
       ) as dag:
   
           for y in range(10):
               task_list.append(
                   TimeDeltaSensorAsync(
                       task_id=f"sense_{y}",
                       delta=timedelta(seconds=0),
                       retries=3,
                       retry_delay=timedelta(seconds=5)
                   )
               )
   
       globals()[_dag_id] = dag
   ```
   
   This issue is made more clear when there are multiple retries because the 
task log that indicate a task cannot run is recorded in the next try as the try 
number is increased. Another clear indicator is the output from the 
scheduler_job.py where the task instance state is checked to be completed (not 
queued).
   
   ```
   [2022-05-27 16:55:43,906] {scheduler_job.py:669} ERROR - Executor reports 
task instance <TaskInstance: 
telescope_processor.live_nation.historic_sense_live_nation_gcs_objects 
scheduled__2022-05-03T00:00:00+00:00 [queued]> finished (success) although the 
task says its queued. (Info: None) Was the task killed externally?
   ```
   
   I believe the purpose of the check is to ensure that the task is not 
executed again before the scheduler acknowledges the event sent by the 
executor, which is only sent when a task execution completes whether it 
succeeded or failed. The check is to prevent tasks getting stuck in the queued 
state. This behaviour is outlined in AIRFLOW-1641 and fixed in #2715.
   
   With the introduction of Triggerers, there is a scenario where task aren't 
necessarily stuck in queued but placed in queued because it was moved to the 
scheduled state post completion of the trigger instance.
   
   This is roughly how the task instance state flows for deferred tasks.
   
   1. Celery execution of the Task instance completes with celery state success 
([source](https://github.com/apache/airflow/blob/2.3.0/airflow/executors/celery_executor.py#L392-L393)).
   2. Trigger instance assigned with triggerer 
([source](https://github.com/apache/airflow/blob/2.3.0/airflow/models/trigger.py#L186-L194)).
   3. Trigger Instance completes and task instance state is set to scheduled 
([source](https://github.com/apache/airflow/blob/2.3.0/airflow/models/trigger.py#L109-L124)).
   4. Task instance key removed from the running set and executor event added 
to the event buffer with task state success 
([source](https://github.com/apache/airflow/blob/2.3.0/airflow/executors/base_executor.py#L261))
 because the celery execution returned with celery state success.
   5. Scheduler processes executor event of the deferred task instance (now in 
the scheduled task state from step 3) with the success task state (from step 4).
   6. Task instance follow task life cycle as normal from scheduled -> queued
   7. Task is executed in worker
   
   However, given how long it takes for the scheduler to get to processing 
executor events in the scheduler loop, step 5 and 6 could be flipped. This 
means that the task instance is in the queued state when the scheduler 
processes the executor event of the deferred task instance. Since the state is 
queued, the condition is met; therefore the task is marked as failed / up for 
retry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to