wolfier opened a new issue, #55004: URL: https://github.com/apache/airflow/issues/55004
### Apache Airflow version 3.0.5 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? A task instance is queued and the celery executor submitted an executor event with `state=TaskInstanceState.QUEUED`. However, the scheduler did not [fetch the event's corresponding task instance](https://github.com/apache/airflow/blob/3.0.5/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L859) because it was locked at the time. You may be wondering what locked the task instance when the scheduler ran the query. I do not think it is critical to identify the source as task instances are locked for various reasons in Airflow. No task instance fetched means the [task instance's `external_executor_id` is not set](https://github.com/apache/airflow/blob/3.0.5/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L866-L869). Usually, the `external_executor_id` is **NOT** critical to the completion of the task execution; however, if the scheduler died after the event is essentially ignored then it is an issue as [the orphaned task is **NOT** adopted](https://github.com/apache/airflow/blob/providers-celery/3.12.2/providers/celery/src/airflow/providers/celery/executors/celery_executor.py#L476-L480) without an `external_executor_id`. [The task instance is reset](https://github.com/apache/airflow/blob/3.0.5/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L2256-L2259) instead. This means the following: * Task log is pushed to the remote log location * Task attempt is **NOT** updated in `task_instance_history` * Task log of the missing attempt is not displayed on the Airflow UI * Task is reset mid-execution and retried in a way that is **NOT** expected by a user ### What you think should happen instead? Instead of ignoring the executor event when its corresponding task instance is locked, the executor event should be returned to the executor's event buffer to be processed at the next scheduler loop. Currently, all executor events are removed from the event buffer first to be processed so any ignored executor events are consumed and not retried. I want to highlight that `external_executor_id` is important when an orphaned task is attempted to be adopted. The `external_executor_id` is used to query celery for the celery task and, only if successful, is the corresponding task instance adopted and added to the scheduler's running set (see [source](https://github.com/apache/airflow/blob/providers-celery/3.12.2/providers/celery/src/airflow/providers/celery/executors/celery_executor.py#L486-L506) for more information). ### How to reproduce 1. Create a dag with a long running task instance 2. Lock the task instance by running a locking query 3. Kill the scheduler and confirm the task instance is reset ### Operating System Debian ### Versions of Apache Airflow Providers _No response_ ### Deployment Astronomer ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
