arkadiuszbach opened a new issue, #58570: URL: https://github.com/apache/airflow/issues/58570
### Apache Airflow version 3.1.3 ### If "Other Airflow 2/3 version" selected, which one? 3.1.1 ### What happened? If scheduler would crash after successfully placing the task on celery queue, but before getting queued event: https://github.com/apache/airflow/blob/f969e6374daa8469938169be16a28f7c073a5ce9/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L852 Then `external_executor_id` will not be set and on next run `adopt_or_reset_orphaned_tasks` will reset the task (`ti.state = None`). If the task was already running then it is going to be executed twice. api `/{task_instance_id}/run` endpoint will not complain as the task instance status will be queued: https://github.com/apache/airflow/blob/f969e6374daa8469938169be16a28f7c073a5ce9/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L101 In airflow 2.10.2 executor_external_id from what i can see was set either by scheduler or by the starting task via `check_and_change_state_before_execution`, so maybe ti_run should also set executor_external_id to prevent task reset in such scenario? https://github.com/apache/airflow/blob/35087d7d10714130cc3e9e9730e34b07fc56938d/airflow/jobs/local_task_job_runner.py#L155 API log: ``` # Initial Start 2025-11-21T17:56:56.297808Z [debug [] Starting task instance run [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:115 pid=175 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d unixname=airflow 2025-11-21T17:56:56.303505Z [debug [] Retrieved task instance details [airflow.api_fastapi.execution_api.routes.task_instances] dag_id=test loc=task_instances.py:148 state=queued task_id=test_12 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d 2025-11-21T17:56:56.303923Z [info [] Task started [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:199 previous_state=queued ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d 2025-11-21T17:56:56.306420Z [info [] Task instance state updated [airflow.api_fastapi.execution_api.routes.task_instances] loc=task_instances.py:212 rows_affected=1 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d # Heartbeat - task state is running 2025-11-21T17:58:56.984438Z [debug [] Processing heartbeat [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:595 pid=175 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d 2025-11-21T17:58:56.990311Z [debug [] Retrieved current task state [airflow.api_fastapi.execution_api.routes.task_instances] current_hostname=10.0.152.49 current_pid=175 loc=task_instances.py:604 state=running ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d 2025-11-21T17:58:56.992791Z [debug [] Heartbeat updated [airflow.api_fastapi.execution_api.routes.task_instances] loc=task_instances.py:648 state=running ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d INFO: 10.0.152.49:47826 - "PUT /execution/task-instances/019aa78f-1e64-7faa-a5e2-b662a0b60f2d/heartbeat HTTP/1.1" 204 No Content # another start after adoption, previous_state=queued 2025-11-21T17:58:59.602695Z [debug [] Starting task instance run [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:115 pid=195 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d unixname=airflow 2025-11-21T17:58:59.609789Z [debug [] Retrieved task instance details [airflow.api_fastapi.execution_api.routes.task_instances] dag_id=test loc=task_instances.py:148 state=queued task_id=test_12 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d 2025-11-21T17:58:59.610316Z [info [] Task started [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:199 previous_state=queued ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d 2025-11-21T17:58:59.613515Z [info [] Task instance state updated [airflow.api_fastapi.execution_api.routes.task_instances] loc=task_instances.py:212 rows_affected=1 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d INFO: 10.0.152.49:47834 - "PATCH /execution/task-instances/019aa78f-1e64-7faa-a5e2-b662a0b60f2d/run HTTP/1.1" 200 OK ``` ### What you think should happen instead? _No response_ ### How to reproduce Was killing db connectivity randomly and it happened, was able to reproduce by modifying scheduler_job_runner.py: 1. I addded ``` time.sleep(30) # to make sure task gets into running state raise Exception() ``` before: ``` ti.external_executor_id = info cls.logger().info("Setting external_executor_id for %s to %s", ti, info) continue ``` 2. Restarted scheduler to trigger adoption ### Operating System Kubernetes ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
