seanmuth opened a new issue, #64971:
URL: https://github.com/apache/airflow/issues/64971

   ### Description
   
   Generated with claude, wanted to open an issue to have some discussion first 
on this proposed fix before opening a PR.
   
   When a scheduler dies in the window between sending a task to Celery and 
processing the resulting `QUEUED` executor event, `external_executor_id` is 
never written to the `task_instance` table. When a new scheduler takes over and 
calls `adopt_or_reset_orphaned_tasks`, 
`CeleryExecutor.try_adopt_task_instances` cannot adopt the task (it requires a 
non-None
     `external_executor_id` to construct the AsyncResult for Celery state 
lookup), so the task is reset and re-queued instead. 
   
   ### Use case/motivation
   
   ### Root cause
   
     external_executor_id is only set as a side effect of 
handle_executor_events when it processes a QUEUED or RUNNING state event:
   
     # scheduler_job_runner.py
   ```
     if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
         ti.external_executor_id = info
   ```
   
     The Celery task_id (result.task_id) is available immediately after 
apply_async() returns in _send_workloads(), but it is only stored in the 
in-memory event_buffer:
   
     # celery_executor.py
   ```
     self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
   ```
   
     If the scheduler process dies before the next handle_executor_events cycle 
flushes that value to the DB, the task_id is lost. The new scheduler's executor 
starts with an empty workloads dict and event buffer, so there is no way to 
recover it.
   
     try_adopt_task_instances explicitly skips tasks without a task_id (with a 
comment acknowledging this limitation):
   
     # celery_executor.py
   ```
     for ti in tis:
         if ti.external_executor_id is not None:
             celery_tasks[ti.external_executor_id] = 
(AsyncResult(ti.external_executor_id), ti)
         else:
             not_adopted_tis.append(ti)  # cannot adopt — no Celery task_id to 
look up
   ```
   
     ### Observed behavior
   
     A task instance in running state owned by a dead scheduler job is reset 
instead of adopted, triggering a duplicate execution. The scheduler logs show:
   
   ```
     Reset the following 1 orphaned TaskInstances:
         <TaskInstance: dag.task scheduled__... [running]>
   ```
   
     with no prior "Setting external_executor_id" log for the task's first try.
   
     ### Expected behavior
   
     The Celery task_id should survive a scheduler restart. The new scheduler 
should be able to adopt the in-flight task and poll its Celery state rather 
than resetting it.
   
     ### Proposed fix
   
     Persist external_executor_id directly to the database in _send_workloads() 
immediately after a successful apply_async(), rather than waiting for it to be 
written as a side effect of event processing:
   
     # celery_executor.py — _send_workloads()
   ```  
   elif result is not None:
         result.backend = cached_celery_backend
         self.running.add(key)
         self.workloads[key] = result
         self.event_buffer[key] = (TaskInstanceState.QUEUED, result.task_id)
         # Persist the Celery task_id immediately so it survives a scheduler 
restart.
         # Currently this is only written as a side effect of 
handle_executor_events,
         # which creates a window where a crash leaves external_executor_id as 
None
         # and prevents adoption by a new scheduler.
         self._persist_external_executor_id(key, result.task_id, session)
   ```
   
     This eliminates the race window between task submission and event 
processing. The event_buffer path can remain as-is for the normal (no-crash) 
flow; the DB write is an additional safety net.
   
     ### Notes
   
     - The existing code comment in try_adopt_task_instances already 
acknowledges this race condition as a known limitation. This fix resolves it.
     - A session would need to be threaded into _send_workloads or the write 
issued via a short-lived session, depending on how the call chain is structured.
     - The handle_executor_events path that sets external_executor_id from 
QUEUED/RUNNING events should be kept as a reconciliation path for any case 
where the DB write is missed.
   
   ### Related issues
   
   https://github.com/apache/airflow/pull/37784
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to