arkadiuszbach opened a new issue, #58570:
URL: https://github.com/apache/airflow/issues/58570

   ### Apache Airflow version
   
   3.1.3
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   3.1.1
   
   ### What happened?
   
   If scheduler would crash after successfully placing the task on celery 
queue, but before getting queued event:
   
https://github.com/apache/airflow/blob/f969e6374daa8469938169be16a28f7c073a5ce9/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L852
   
   Then `external_executor_id` will not be set and on next run 
`adopt_or_reset_orphaned_tasks` will reset the task (`ti.state = None`). If the 
task was already running then it is going to be executed twice. 
   
   api `/{task_instance_id}/run` endpoint will not complain as the task 
instance status will be queued:
   
https://github.com/apache/airflow/blob/f969e6374daa8469938169be16a28f7c073a5ce9/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L101
   
   
   In airflow 2.10.2 executor_external_id from what i can see was set either by 
scheduler or by the starting task via 
`check_and_change_state_before_execution`, so maybe ti_run should also set 
executor_external_id  to prevent task reset in such scenario?
   
https://github.com/apache/airflow/blob/35087d7d10714130cc3e9e9730e34b07fc56938d/airflow/jobs/local_task_job_runner.py#L155
   
   
   
   API log:
   
   ```
   # Initial Start
   2025-11-21T17:56:56.297808Z [debug    [] Starting task instance run     
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 
loc=task_instances.py:115 pid=175 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d 
unixname=airflow
   2025-11-21T17:56:56.303505Z [debug    [] Retrieved task instance details 
[airflow.api_fastapi.execution_api.routes.task_instances] dag_id=test 
loc=task_instances.py:148 state=queued task_id=test_12 
ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   2025-11-21T17:56:56.303923Z [info     [] Task started                   
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 
loc=task_instances.py:199 previous_state=queued 
ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   2025-11-21T17:56:56.306420Z [info     [] Task instance state updated    
[airflow.api_fastapi.execution_api.routes.task_instances] 
loc=task_instances.py:212 rows_affected=1 
ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   
   # Heartbeat - task state is running
   2025-11-21T17:58:56.984438Z [debug    [] Processing heartbeat           
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 
loc=task_instances.py:595 pid=175 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   2025-11-21T17:58:56.990311Z [debug    [] Retrieved current task state   
[airflow.api_fastapi.execution_api.routes.task_instances] 
current_hostname=10.0.152.49 current_pid=175 loc=task_instances.py:604 
state=running ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   2025-11-21T17:58:56.992791Z [debug    [] Heartbeat updated              
[airflow.api_fastapi.execution_api.routes.task_instances] 
loc=task_instances.py:648 state=running 
ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   INFO:     10.0.152.49:47826 - "PUT 
/execution/task-instances/019aa78f-1e64-7faa-a5e2-b662a0b60f2d/heartbeat 
HTTP/1.1" 204 No Content
   
   # another start after adoption, previous_state=queued
   2025-11-21T17:58:59.602695Z [debug    [] Starting task instance run     
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 
loc=task_instances.py:115 pid=195 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d 
unixname=airflow
   2025-11-21T17:58:59.609789Z [debug    [] Retrieved task instance details 
[airflow.api_fastapi.execution_api.routes.task_instances] dag_id=test 
loc=task_instances.py:148 state=queued task_id=test_12 
ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   2025-11-21T17:58:59.610316Z [info     [] Task started                   
[airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 
loc=task_instances.py:199 previous_state=queued 
ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   2025-11-21T17:58:59.613515Z [info     [] Task instance state updated    
[airflow.api_fastapi.execution_api.routes.task_instances] 
loc=task_instances.py:212 rows_affected=1 
ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
   INFO:     10.0.152.49:47834 - "PATCH 
/execution/task-instances/019aa78f-1e64-7faa-a5e2-b662a0b60f2d/run HTTP/1.1" 
200 OK
   ```
   
   
   
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Was killing db connectivity randomly and it happened, was able to reproduce 
by modifying scheduler_job_runner.py:
   
   1. I addded
       ```
       time.sleep(30) # to make sure task gets into running state
       raise Exception()
       ```
       before:
       ```
       ti.external_executor_id = info
       cls.logger().info("Setting external_executor_id for %s to %s", ti, info)
        continue
       ```
   2. Restarted scheduler to trigger adoption
   
   
   ### Operating System
   
   Kubernetes
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to