wolfier opened a new issue, #55004:
URL: https://github.com/apache/airflow/issues/55004

   ### Apache Airflow version
   
   3.0.5
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   A task instance is queued and the celery executor submitted an executor 
event with `state=TaskInstanceState.QUEUED`. However, the scheduler did not 
[fetch the event's corresponding task 
instance](https://github.com/apache/airflow/blob/3.0.5/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L859)
 because it was locked at the time. You may be wondering what locked the task 
instance when the scheduler ran the query. I do not think it is critical to 
identify the source as task instances are locked for various reasons in Airflow.
   
   No task instance fetched means the [task instance's `external_executor_id` 
is not 
set](https://github.com/apache/airflow/blob/3.0.5/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L866-L869).
 Usually, the `external_executor_id` is **NOT** critical to the completion of 
the task execution; however, if the scheduler died after the event is 
essentially ignored then it is an issue as [the orphaned task is **NOT** 
adopted](https://github.com/apache/airflow/blob/providers-celery/3.12.2/providers/celery/src/airflow/providers/celery/executors/celery_executor.py#L476-L480)
 without an `external_executor_id`. [The task instance is 
reset](https://github.com/apache/airflow/blob/3.0.5/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L2256-L2259)
 instead.
   
   This means the following:
   * Task log is pushed to the remote log location
   * Task attempt is **NOT** updated in `task_instance_history`
   * Task log of the missing attempt is not displayed on the Airflow UI
   * Task is reset mid-execution and retried in a way that is **NOT** expected 
by a user
   
   ### What you think should happen instead?
   
   Instead of ignoring the executor event when its corresponding task instance 
is locked, the executor event should be returned to the executor's event buffer 
to be processed at the next scheduler loop. Currently, all executor events are 
removed from the event buffer first to be processed so any ignored executor 
events are consumed and not retried. 
   
   I want to highlight that `external_executor_id` is important when an 
orphaned task is attempted to be adopted. The `external_executor_id` is used to 
query celery for the celery task and, only if successful, is the corresponding 
task instance adopted and added to the scheduler's running set (see 
[source](https://github.com/apache/airflow/blob/providers-celery/3.12.2/providers/celery/src/airflow/providers/celery/executors/celery_executor.py#L486-L506)
 for more information).
   
   
   
   ### How to reproduce
   
   1. Create a dag with a long running task instance
   2. Lock the task instance by running a locking query
   3. Kill the scheduler and confirm the task instance is reset
   
   ### Operating System
   
   Debian
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to